From f36725a7c33e4b4874f9598db1ed9d0f5f9a217b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Montilla?= Date: Tue, 24 Oct 2023 12:46:02 -0400 Subject: [PATCH] feat: LTI 1.3 reusable configuration (#390) Co-authored-by: Squirrel18 Co-authored-by: alexjmpb --- CHANGELOG.rst | 4 + README.rst | 25 ++++ lti_consumer/__init__.py | 2 +- lti_consumer/api.py | 10 +- lti_consumer/exceptions.py | 7 + lti_consumer/lti_xblock.py | 22 ++-- lti_consumer/models.py | 73 ++++++---- lti_consumer/plugin/urls.py | 14 ++ lti_consumer/plugin/views.py | 106 ++++++++++++--- lti_consumer/static/js/xblock_studio_view.js | 14 +- lti_consumer/tests/unit/plugin/test_views.py | 97 +++++++++++++- lti_consumer/tests/unit/test_api.py | 39 ++++++ lti_consumer/tests/unit/test_lti_xblock.py | 69 +++++++++- lti_consumer/tests/unit/test_models.py | 132 +++++++++++++------ lti_consumer/utils.py | 4 + 15 files changed, 507 insertions(+), 111 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index ac64813c..96983ea7 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,10 @@ Please See the `releases tab `_ for testing the LTI 2.0 Result Service 2.0 implementation. +LTI Reusable configuration +************************** + +The LTI Consumer XBlock supports configuration reusability via plugins. +It is compatible with both LTI 1.1 and LTI 1.3. +All values (including the access token and keyset URL for LTI 1.3) +are shared across the XBlocks with the same external configuration ID. +This eliminates the need to have a tool deployment for each XBlock. + +How to Setup +============ + +1. Install and setup the `openedx-ltistore`_ plugin on the LMS and Studio. +2. Go to LMS admin -> WAFFLE_UTILS -> Waffle flag course override + (http://localhost:18000/admin/waffle_utils/waffleflagcourseoverridemodel/). +3. Create a waffle flag course override with these values: + - Waffle flag: lti_consumer.enable_external_config_filter + - Course id: + - Override choice: Force On + - Enabled: True +4. Create a new external LTI configuration and use it in the XBlock. + This is explained in the README of the `openedx-ltistore`_ repository. + +.. _openedx-ltistore: https://github.com/open-craft/openedx-ltistore + Getting Help ************ diff --git a/lti_consumer/__init__.py b/lti_consumer/__init__.py index 030511cb..22663674 100644 --- a/lti_consumer/__init__.py +++ b/lti_consumer/__init__.py @@ -4,4 +4,4 @@ from .apps import LTIConsumerApp from .lti_xblock import LtiConsumerXBlock -__version__ = '9.6.2' +__version__ = '9.7.0' diff --git a/lti_consumer/api.py b/lti_consumer/api.py index dc1b29d2..768b3649 100644 --- a/lti_consumer/api.py +++ b/lti_consumer/api.py @@ -141,10 +141,18 @@ def get_lti_1p3_launch_info( deep_linking_content_items = [item.attributes for item in dl_content_items] config_id = lti_config.config_id + client_id = lti_config.lti_1p3_client_id + + # Display LTI launch information from external configuration. + # if an external configuration is being used. + if lti_config.config_store == lti_config.CONFIG_EXTERNAL: + external_config = get_external_config_from_filter({}, lti_config.external_id) + config_id = lti_config.external_id.replace(':', '/') + client_id = external_config.get('lti_1p3_client_id') # Return LTI launch information for end user configuration return { - 'client_id': lti_config.lti_1p3_client_id, + 'client_id': client_id, 'keyset_url': get_lms_lti_keyset_link(config_id), 'deployment_id': '1', 'oidc_callback': get_lms_lti_launch_link(), diff --git a/lti_consumer/exceptions.py b/lti_consumer/exceptions.py index dec54692..8d3c0c5a 100644 --- a/lti_consumer/exceptions.py +++ b/lti_consumer/exceptions.py @@ -7,3 +7,10 @@ class LtiError(Exception): """ General error class for LTI Consumer usage. """ + + +class ExternalConfigurationNotFound(Exception): + """ + This exception is used when a reusable external configuration + is not found for a given external ID. + """ diff --git a/lti_consumer/lti_xblock.py b/lti_consumer/lti_xblock.py index 44b11d08..50ec65f7 100644 --- a/lti_consumer/lti_xblock.py +++ b/lti_consumer/lti_xblock.py @@ -82,6 +82,7 @@ external_config_filter_enabled, external_user_id_1p1_launches_enabled, database_config_enabled, + EXTERNAL_ID_REGEX, ) log = logging.getLogger(__name__) @@ -691,6 +692,16 @@ def validate_field_data(self, validation, data): _('Custom Parameters should be strings in "x=y" format.'), ))) + # Validate the external config ID. + if ( + data.config_type == 'external' and not + (data.external_config and EXTERNAL_ID_REGEX.match(str(data.external_config))) + ): + _ = self.runtime.service(self, 'i18n').ugettext + validation.add(ValidationMessage(ValidationMessage.ERROR, str( + _('Reusable configuration ID must be set when using external config (Example: "x:y").'), + ))) + # keyset URL and public key are mutually exclusive if data.lti_1p3_tool_key_mode == 'keyset_url': data.lti_1p3_tool_public_key = '' @@ -1664,10 +1675,7 @@ def _get_lti_block_launch_handler(self): """ Return the LTI block launch handler. """ - # The "external" config_type is not supported for LTI 1.3, only LTI 1.1. Therefore, ensure that we define - # the lti_block_launch_handler using LTI 1.1 logic for "external" config_types. - # TODO: This needs to change when the LTI 1.3 support is added to the external config_type in the future. - if self.lti_version == 'lti_1p1' or self.config_type == "external": + if self.lti_version == 'lti_1p1': lti_block_launch_handler = self.runtime.handler_url(self, 'lti_launch_handler').rstrip('/?') else: launch_data = self.get_lti_1p3_launch_data() @@ -1687,10 +1695,8 @@ def _get_lti_1p3_launch_url(self, consumer): """ lti_1p3_launch_url = self.lti_1p3_launch_url.strip() - # The "external" config_type is not supported for LTI 1.3, only LTI 1.1. Therefore, ensure that we define - # the lti_1p3_launch_url using the LTI 1.3 consumer only for config_types that support LTI 1.3. - # TODO: This needs to change when the LTI 1.3 support is added to the external config_type in the future. - if consumer and self.lti_version == "lti_1p3" and self.config_type == "database": + # Get LTI launch URL from consumer if using database or external configuration type. + if consumer and self.lti_version == 'lti_1p3' and self.config_type in ('database', 'external'): lti_1p3_launch_url = consumer.launch_url return lti_1p3_launch_url diff --git a/lti_consumer/models.py b/lti_consumer/models.py index 73351c64..0f39e212 100644 --- a/lti_consumer/models.py +++ b/lti_consumer/models.py @@ -15,6 +15,7 @@ from opaque_keys.edx.django.models import CourseKeyField, UsageKeyField from opaque_keys.edx.keys import CourseKey from config_models.models import ConfigurationModel +from django.utils.functional import cached_property from django.utils.translation import gettext_lazy as _ from lti_consumer.filters import get_external_config_from_filter @@ -31,6 +32,7 @@ get_lti_nrps_context_membership_url, choose_lti_1p3_redirect_uris, model_to_dict, + EXTERNAL_ID_REGEX, ) log = logging.getLogger(__name__) @@ -251,6 +253,12 @@ def clean(self): raise ValidationError({ "config_store": _("LTI Configuration stores on XBlock needs a block location set."), }) + if self.config_store == self.CONFIG_EXTERNAL and not EXTERNAL_ID_REGEX.match(str(self.external_id)): + raise ValidationError({ + "config_store": _( + 'LTI Configuration using reusable configuration needs a external ID in "x:y" format.', + ), + }) if self.version == self.LTI_1P3 and self.config_store == self.CONFIG_ON_DB: if self.lti_1p3_tool_public_key == "" and self.lti_1p3_tool_keyset_url == "": raise ValidationError({ @@ -280,7 +288,7 @@ def sync_configurations(self): otherwise, it will try to query any children configuration and update their fields using the current configuration values. """ - EXCLUDED_FIELDS = ['id', 'config_id', 'location'] + EXCLUDED_FIELDS = ['id', 'config_id', 'location', 'external_config'] if isinstance(self.location, CCXBlockUsageLocator): # Query main configuration using main location. @@ -364,6 +372,13 @@ def lti_1p3_public_jwk(self): self._generate_lti_1p3_keys_if_missing() return json.loads(self.lti_1p3_internal_public_jwk) + @cached_property + def external_config(self): + """ + Return the external resuable configuration. + """ + return get_external_config_from_filter({}, self.external_id) + def _get_lti_1p1_consumer(self): """ Return a class of LTI 1.1 consumer. @@ -374,10 +389,9 @@ def _get_lti_1p1_consumer(self): key, secret = block.lti_provider_key_secret launch_url = block.launch_url elif self.config_store == self.CONFIG_EXTERNAL: - config = get_external_config_from_filter({}, self.external_id) - key = config.get("lti_1p1_client_key") - secret = config.get("lti_1p1_client_secret") - launch_url = config.get("lti_1p1_launch_url") + key = self.external_config.get("lti_1p1_client_key") + secret = self.external_config.get("lti_1p1_client_secret") + launch_url = self.external_config.get("lti_1p1_launch_url") else: key = self.lti_1p1_client_key secret = self.lti_1p1_client_secret @@ -389,11 +403,10 @@ def get_lti_advantage_ags_mode(self): """ Return LTI 1.3 Advantage Assignment and Grade Services mode. """ - if self.config_store == self.CONFIG_EXTERNAL: - # TODO: Add support for CONFIG_EXTERNAL for LTI 1.3. - raise NotImplementedError if self.config_store == self.CONFIG_ON_DB: return self.lti_advantage_ags_mode + elif self.config_store == self.CONFIG_EXTERNAL: + return self.external_config.get('lti_advantage_ags_mode') else: block = compat.load_enough_xblock(self.location) return block.lti_advantage_ags_mode @@ -402,11 +415,10 @@ def get_lti_advantage_deep_linking_enabled(self): """ Return whether LTI 1.3 Advantage Deep Linking is enabled. """ - if self.config_store == self.CONFIG_EXTERNAL: - # TODO: Add support for CONFIG_EXTERNAL for LTI 1.3. - raise NotImplementedError("CONFIG_EXTERNAL is not supported for LTI 1.3 Advantage services: %s") if self.config_store == self.CONFIG_ON_DB: return self.lti_advantage_deep_linking_enabled + elif self.config_store == self.CONFIG_EXTERNAL: + return self.external_config.get('lti_advantage_deep_linking_enabled') else: block = compat.load_enough_xblock(self.location) return block.lti_advantage_deep_linking_enabled @@ -415,11 +427,10 @@ def get_lti_advantage_deep_linking_launch_url(self): """ Return the LTI 1.3 Advantage Deep Linking launch URL. """ - if self.config_store == self.CONFIG_EXTERNAL: - # TODO: Add support for CONFIG_EXTERNAL for LTI 1.3. - raise NotImplementedError("CONFIG_EXTERNAL is not supported for LTI 1.3 Advantage services: %s") if self.config_store == self.CONFIG_ON_DB: return self.lti_advantage_deep_linking_launch_url + elif self.config_store == self.CONFIG_EXTERNAL: + return self.external_config.get('lti_advantage_deep_linking_launch_url') else: block = compat.load_enough_xblock(self.location) return block.lti_advantage_deep_linking_launch_url @@ -428,11 +439,10 @@ def get_lti_advantage_nrps_enabled(self): """ Return whether LTI 1.3 Advantage Names and Role Provisioning Services is enabled. """ - if self.config_store == self.CONFIG_EXTERNAL: - # TODO: Add support for CONFIG_EXTERNAL for LTI 1.3. - raise NotImplementedError("CONFIG_EXTERNAL is not supported for LTI 1.3 Advantage services: %s") if self.config_store == self.CONFIG_ON_DB: return self.lti_advantage_enable_nrps + elif self.config_store == self.CONFIG_EXTERNAL: + return self.external_config.get('lti_advantage_enable_nrps') else: block = compat.load_enough_xblock(self.location) return block.lti_1p3_enable_nrps @@ -453,6 +463,7 @@ def _setup_lti_1p3_ags(self, consumer): return lineitem = self.ltiagslineitem_set.first() + # If using the declarative approach, we should create a LineItem if it # doesn't exist. This is because on this mode the tool is not able to create # and manage lineitems using the AGS endpoints. @@ -572,9 +583,25 @@ def _get_lti_1p3_consumer(self): tool_key=self.lti_1p3_tool_public_key, tool_keyset_url=self.lti_1p3_tool_keyset_url, ) + elif self.config_store == self.CONFIG_EXTERNAL: + consumer = consumer_class( + iss=get_lti_api_base(), + lti_oidc_url=self.external_config.get('lti_1p3_oidc_url'), + lti_launch_url=self.external_config.get('lti_1p3_launch_url'), + client_id=self.external_config.get('lti_1p3_client_id'), + # Deployment ID hardcoded to 1 since + # we're not using multi-tenancy. + deployment_id='1', + rsa_key=self.external_config.get('lti_1p3_private_key'), + rsa_key_id=self.external_config.get('lti_1p3_private_key_id'), + # Registered redirect uris + redirect_uris=self.get_lti_1p3_redirect_uris(), + tool_key=self.external_config.get('lti_1p3_tool_public_key'), + tool_keyset_url=self.external_config.get('lti_1p3_tool_keyset_url'), + ) else: - # This should not occur, but raise an error if self.config_store is not CONFIG_ON_XBLOCK - # or CONFIG_ON_DB. + # This should not occur, but raise an error if self.config_store is not + # CONFIG_ON_XBLOCK, CONFIG_ON_DB or CONFIG_EXTERNAL. raise NotImplementedError if isinstance(consumer, LtiAdvantageConsumer): @@ -598,10 +625,10 @@ def get_lti_1p3_redirect_uris(self): Return pre-registered redirect uris or sensible defaults """ if self.config_store == self.CONFIG_EXTERNAL: - # TODO: Add support for CONFIG_EXTERNAL for LTI 1.3. - raise NotImplementedError - - if self.config_store == self.CONFIG_ON_DB: + redirect_uris = self.external_config.get('lti_1p3_redirect_uris') + launch_url = self.external_config.get('lti_1p3_launch_url') + deep_link_launch_url = self.external_config.get('lti_advantage_deep_linking_launch_url') + elif self.config_store == self.CONFIG_ON_DB: redirect_uris = self.lti_1p3_redirect_uris launch_url = self.lti_1p3_launch_url deep_link_launch_url = self.lti_advantage_deep_linking_launch_url diff --git a/lti_consumer/plugin/urls.py b/lti_consumer/plugin/urls.py index 69780718..a6117b71 100644 --- a/lti_consumer/plugin/urls.py +++ b/lti_consumer/plugin/urls.py @@ -31,6 +31,13 @@ public_keyset_endpoint, name='lti_consumer.public_keyset_endpoint_via_location' ), + # The external ID is split into slashes to make the URL more readable + # and avoid clashing with USAGE_ID_PATTERN. + path( + 'lti_consumer/v1/public_keysets//', + public_keyset_endpoint, + name='lti_consumer.public_keyset_endpoint_via_external_id' + ), re_path( 'lti_consumer/v1/launch/(?:/(?P.*))?$', launch_gate_endpoint, @@ -46,6 +53,13 @@ access_token_endpoint, name='lti_consumer.access_token_via_location' ), + # The external ID is split into slashes to make the URL more readable + # and avoid clashing with USAGE_ID_PATTERN. + path( + 'lti_consumer/v1/token//', + access_token_endpoint, + name='lti_consumer.access_token_via_external_id' + ), re_path( r'lti_consumer/v1/lti/(?P[-\w]+)/lti-dl/response', deep_linking_response_endpoint, diff --git a/lti_consumer/plugin/views.py b/lti_consumer/plugin/views.py index 4757a596..c5436c2d 100644 --- a/lti_consumer/plugin/views.py +++ b/lti_consumer/plugin/views.py @@ -6,7 +6,7 @@ from django.conf import settings from django.contrib.auth import get_user_model -from django.core.exceptions import ObjectDoesNotExist, PermissionDenied, ValidationError +from django.core.exceptions import PermissionDenied, ValidationError from django.db import transaction from django.http import Http404, JsonResponse from django.shortcuts import render @@ -25,8 +25,8 @@ from rest_framework.status import HTTP_200_OK, HTTP_400_BAD_REQUEST, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND from lti_consumer.api import get_lti_pii_sharing_state_for_course, validate_lti_1p3_launch_data -from lti_consumer.exceptions import LtiError -from lti_consumer.lti_1p3.consumer import LtiProctoringConsumer +from lti_consumer.exceptions import LtiError, ExternalConfigurationNotFound +from lti_consumer.lti_1p3.consumer import LtiConsumer1p3, LtiProctoringConsumer from lti_consumer.lti_1p3.exceptions import (BadJwtSignature, InvalidClaimValue, Lti1p3Exception, LtiDeepLinkingContentTypeNotSupported, MalformedJwtToken, MissingRequiredClaim, NoSuitableKeys, TokenSignatureExpired, @@ -48,7 +48,8 @@ from lti_consumer.plugin import compat from lti_consumer.signals.signals import LTI_1P3_PROCTORING_ASSESSMENT_STARTED from lti_consumer.track import track_event -from lti_consumer.utils import _, get_data_from_cache, get_lti_1p3_context_types_claim +from lti_consumer.utils import _, get_data_from_cache, get_lti_1p3_context_types_claim, get_lti_api_base +from lti_consumer.filters import get_external_config_from_filter log = logging.getLogger(__name__) @@ -83,21 +84,50 @@ def has_block_access(user, block, course_key): @require_http_methods(["GET"]) -def public_keyset_endpoint(request, usage_id=None, lti_config_id=None): +def public_keyset_endpoint( + request, + usage_id=None, + lti_config_id=None, + external_app=None, + external_slug=None, +): """ Gate endpoint to fetch public keysets from a problem This is basically a passthrough function that uses the OIDC response parameter `login_hint` to locate the block and run the proper handler. + + Arguments: + lti_config_id (UUID): config_id of the LtiConfiguration + usage_id (UsageKey): location of the Block + external_app (str): App name of the external LTI configuration + external_slug (str): Slug of the external LTI configuration. + + Returns: + JsonResponse or Http404 """ + external_id = f"{external_app}:{external_slug}" + try: if usage_id: lti_config = LtiConfiguration.objects.get(location=UsageKey.from_string(usage_id)) + version = lti_config.version + public_jwk = lti_config.lti_1p3_public_jwk elif lti_config_id: lti_config = LtiConfiguration.objects.get(config_id=lti_config_id) + version = lti_config.version + public_jwk = lti_config.lti_1p3_public_jwk + elif external_app and external_slug: + lti_config = get_external_config_from_filter({}, external_id) + + if not lti_config: + raise ExternalConfigurationNotFound("External LTI configuration not found") - if lti_config.version != lti_config.LTI_1P3: + version = lti_config.get("version") + public_jwk = lti_config.get("lti_1p3_public_jwk", {}) + + if version != LtiConfiguration.LTI_1P3: raise LtiError( "LTI Error: LTI 1.1 blocks do not have a public keyset endpoint." ) @@ -105,14 +135,13 @@ def public_keyset_endpoint(request, usage_id=None, lti_config_id=None): # Retrieve block's Public JWK # The underlying method will generate a new Private-Public Pair if one does # not exist, and retrieve the values. - response = JsonResponse(lti_config.lti_1p3_public_jwk) + response = JsonResponse(public_jwk) response['Content-Disposition'] = 'attachment; filename=keyset.json' return response - except (LtiError, InvalidKeyError, ObjectDoesNotExist) as exc: + except (InvalidKeyError, LtiConfiguration.DoesNotExist, ExternalConfigurationNotFound, LtiError) as exc: log.info( - "Error while retrieving keyset for usage_id (%r) or lit_config_id (%s): %s", - usage_id, - lti_config_id, + "Error while retrieving keyset for ID %s: %s", + usage_id or lti_config_id or external_id, exc, exc_info=True, ) @@ -362,7 +391,13 @@ def launch_gate_endpoint(request, suffix=None): # pylint: disable=unused-argume @csrf_exempt @xframe_options_sameorigin @require_http_methods(["POST"]) -def access_token_endpoint(request, lti_config_id=None, usage_id=None): +def access_token_endpoint( + request, + lti_config_id=None, + usage_id=None, + external_app=None, + external_slug=None, +): """ Gate endpoint to enable tools to retrieve access tokens for the LTI 1.3 tool. @@ -371,6 +406,8 @@ def access_token_endpoint(request, lti_config_id=None, usage_id=None): Arguments: lti_config_id (UUID): config_id of the LtiConfiguration usage_id (UsageKey): location of the Block + external_app (str): App name of the external LTI configuration + external_slug (str): Slug of the external LTI configuration. Returns: JsonResponse or Http404 @@ -379,21 +416,50 @@ def access_token_endpoint(request, lti_config_id=None, usage_id=None): Sucess: https://tools.ietf.org/html/rfc6749#section-4.4.3 Failure: https://tools.ietf.org/html/rfc6749#section-5.2 """ + external_id = f"{external_app}:{external_slug}" try: - if lti_config_id: + if usage_id: + lti_config = LtiConfiguration.objects.get(location=UsageKey.from_string(usage_id)) + version = lti_config.version + lti_consumer = lti_config.get_lti_consumer() + elif lti_config_id: lti_config = LtiConfiguration.objects.get(config_id=lti_config_id) - else: - usage_key = UsageKey.from_string(usage_id) - lti_config = LtiConfiguration.objects.get(location=usage_key) - except LtiConfiguration.DoesNotExist as exc: - log.warning("Error getting the LTI configuration with id %r: %s", lti_config_id, exc, exc_info=True) + version = lti_config.version + lti_consumer = lti_config.get_lti_consumer() + elif external_app and external_slug: + lti_config = get_external_config_from_filter({}, external_id) + + if not lti_config: + raise ExternalConfigurationNotFound("External LTI configuration not found") + + version = lti_config.get("version") + # External LTI configurations don't have a get_lti_consumer method + # so we initialize the LtiConsumer1p3 class using the external config data. + lti_consumer = LtiConsumer1p3( + iss=get_lti_api_base(), + lti_oidc_url=None, + lti_launch_url=None, + client_id=lti_config.get("lti_1p3_client_id"), + deployment_id=None, + rsa_key=lti_config.get("lti_1p3_private_key"), + rsa_key_id=lti_config.get("lti_1p3_private_key_id"), + redirect_uris=None, + tool_key=lti_config.get("lti_1p3_tool_public_key"), + tool_keyset_url=lti_config.get("lti_1p3_tool_keyset_url"), + ) + except (InvalidKeyError, LtiConfiguration.DoesNotExist, ExternalConfigurationNotFound) as exc: + log.info( + "Error while retrieving access token for ID %s: %s", + usage_id or lti_config_id or external_id, + exc, + exc_info=True, + ) raise Http404 from exc - if lti_config.version != lti_config.LTI_1P3: + if version != LtiConfiguration.LTI_1P3: return JsonResponse({"error": "invalid_lti_version"}, status=HTTP_404_NOT_FOUND) - lti_consumer = lti_config.get_lti_consumer() try: token = lti_consumer.access_token( dict(urllib.parse.parse_qsl( diff --git a/lti_consumer/static/js/xblock_studio_view.js b/lti_consumer/static/js/xblock_studio_view.js index b920a12e..e8331be9 100644 --- a/lti_consumer/static/js/xblock_studio_view.js +++ b/lti_consumer/static/js/xblock_studio_view.js @@ -76,29 +76,25 @@ function LtiConsumerXBlockInitStudio(runtime, element) { */ function getFieldsToHideForLtiConfigType() { const configType = $(element).find('#xb-field-edit-config_type').val(); - const configFields = lti1P1FieldList.concat(lti1P3FieldList); + const databaseConfigHiddenFields = lti1P1FieldList.concat(lti1P3FieldList); + const externalConfigHiddenFields = lti1P1FieldList.concat(lti1P3FieldList); const fieldsToHide = []; if (configType === "external") { - // Hide the lti_version field and all the LTI 1.1 and LTI 1.3 fields. - configFields.forEach(function (field) { + // Hide LTI 1.1 and LTI 1.3 tool fields. + externalConfigHiddenFields.forEach(function (field) { fieldsToHide.push(field); }) - fieldsToHide.push("lti_version"); } else if (configType === "database") { // Hide the LTI 1.1 and LTI 1.3 fields. The XBlock will remain the source of truth for the lti_version, // so do not hide it and continue to allow editing it from the XBlock edit menu in Studio. - configFields.forEach(function (field) { + databaseConfigHiddenFields.forEach(function (field) { fieldsToHide.push(field); }) } else { // No fields should be hidden based on a config_type of 'new'. } - if (configType === "external") { - fieldsToHide.push("external_config"); - } - return fieldsToHide; } diff --git a/lti_consumer/tests/unit/plugin/test_views.py b/lti_consumer/tests/unit/plugin/test_views.py index a2c701a2..02baa3cb 100644 --- a/lti_consumer/tests/unit/plugin/test_views.py +++ b/lti_consumer/tests/unit/plugin/test_views.py @@ -30,6 +30,7 @@ from lti_consumer.utils import cache_lti_1p3_launch_data +@ddt.ddt class TestLti1p3KeysetEndpoint(TestCase): """ Test `public_keyset_endpoint` method. @@ -74,6 +75,19 @@ def test_invalid_usage_key(self): response = self.client.get('/lti_consumer/v1/public_keysets/invalid-key') self.assertEqual(response.status_code, 404) + @ddt.data( + ('lti_consumer:lti_consumer.public_keyset_endpoint', ['075194d3-6885-417e-a8a8-6c931e272f00']), + ('lti_consumer:lti_consumer.public_keyset_endpoint_via_location', ['block-v1:x+x+x+type@problem+block@x']), + ('lti_consumer:lti_consumer.public_keyset_endpoint_via_external_id', ['x', 'x']), + ) + @ddt.unpack + def test_nonexistent_configuration(self, url, args): + """ + Check that 404 is returned when there is no configuration found. + """ + response = self.client.get(reverse(url, args=args)) + self.assertEqual(response.status_code, 404) + def test_wrong_lti_version(self): """ Check if trying to fetch the public keyset for LTI 1.1 yields a HTTP code 404. @@ -102,6 +116,25 @@ def test_public_keyset_endpoint_using_lti_config_id_in_url(self): json.loads(response.content.decode('utf-8')) ) + @patch('lti_consumer.plugin.views.get_external_config_from_filter') + def test_public_keyset_endpoint_using_external_id_in_url(self, get_external_config_from_filter): + """ + Check that the endpoint is accessible using the external LTI configuration ID. + """ + external_config = {'version': LtiConfiguration.LTI_1P3, 'lti_1p3_public_jwk': {}} + get_external_config_from_filter.return_value = external_config + + response = self.client.get(reverse( + 'lti_consumer:lti_consumer.public_keyset_endpoint_via_external_id', + args=['x', 'x'] + )) + + get_external_config_from_filter.assert_called_once_with({}, "x:x") + self.assertEqual(response.status_code, 200) + self.assertEqual(response['Content-type'], 'application/json') + self.assertEqual(response['Content-Disposition'], 'attachment; filename=keyset.json') + self.assertEqual(external_config["lti_1p3_public_jwk"], json.loads(response.content.decode('utf-8'))) + @ddt.ddt class TestLti1p3LaunchGateEndpoint(TestCase): @@ -173,7 +206,7 @@ def test_invalid_lti_version(self): self.config.version = LtiConfiguration.LTI_1P3 self.config.save() - def test_non_existant_lti_config(self): + def test_nonexistent_lti_config(self): """ Check that a 404 is returned when LtiConfiguration for a location doesn't exist """ @@ -620,6 +653,7 @@ def test_launch_non_existing_custom_parameters(self, mock_set_custom_parameters) mock_set_custom_parameters.assert_not_called() +@ddt.ddt class TestLti1p3AccessTokenEndpoint(TestCase): """ Test `access_token_endpoint` method. @@ -687,12 +721,65 @@ def test_access_token_endpoint_with_location_in_url(self): self.assertEqual(response.status_code, 200) self.assertEqual(response.json(), token) - def test_non_existant_configuration_for_given_id(self): + @patch('lti_consumer.plugin.views.get_lti_api_base') + @patch('lti_consumer.plugin.views.LtiConsumer1p3') + @patch('lti_consumer.plugin.views.get_external_config_from_filter') + def test_access_token_endpoint_with_external_id_in_url( + self, + get_external_config_from_filter, + lti_consumer, + get_lti_api_base, + ): + """ + Check that the access_token generated by the lti_consumer is returned. + """ + external_config = { + 'version': LtiConfiguration.LTI_1P3, + 'lti_1p3_client_id': 'client-id', + 'lti_1p3_private_key': 'private-key', + 'lti_1p3_private_key_id': 'private-key-id', + 'lti_1p3_tool_public_key': 'public-key', + 'lti_1p3_tool_keyset_url': 'keyset-url', + } + token = {'access_token': 'test-token'} + get_external_config_from_filter.return_value = external_config + lti_consumer.return_value.access_token.return_value = token + + body = self.get_body(create_jwt(self.key, {})) + response = self.client.post( + reverse('lti_consumer:lti_consumer.access_token_via_external_id', args=['x', 'x']), + data=body, + ) + + get_external_config_from_filter.assert_called_once_with({}, 'x:x') + get_lti_api_base.assert_called_once_with() + lti_consumer.assert_called_once_with( + iss=get_lti_api_base(), + lti_oidc_url=None, + lti_launch_url=None, + client_id=external_config['lti_1p3_client_id'], + deployment_id=None, + rsa_key=external_config['lti_1p3_private_key'], + rsa_key_id=external_config['lti_1p3_private_key_id'], + redirect_uris=None, + tool_key=external_config['lti_1p3_tool_public_key'], + tool_keyset_url=external_config['lti_1p3_tool_keyset_url'], + ) + lti_consumer().access_token.called_once_with(body) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json(), token) + + @ddt.data( + ('lti_consumer:lti_consumer.access_token', ['075194d3-6885-417e-a8a8-6c931e272f00']), + ('lti_consumer:lti_consumer.access_token_via_location', ['block-v1:x+x+x+type@problem+block@x']), + ('lti_consumer:lti_consumer.access_token_via_external_id', ['x', 'x']), + ) + @ddt.unpack + def test_nonexistent_configuration(self, url, args): """ - Check that 404 is returned when there is no configuration for a given id + Check that 404 is returned when there is no configuration found. """ - url = reverse('lti_consumer:lti_consumer.access_token', args=['075194d3-6885-417e-a8a8-6c931e272f00']) - response = self.client.post(url) + response = self.client.post(reverse(url, args=args)) self.assertEqual(response.status_code, 404) def test_verify_lti_version_is_1p3(self): diff --git a/lti_consumer/tests/unit/test_api.py b/lti_consumer/tests/unit/test_api.py index 083f850e..5b3095a1 100644 --- a/lti_consumer/tests/unit/test_api.py +++ b/lti_consumer/tests/unit/test_api.py @@ -538,6 +538,45 @@ def test_launch_info_for_lti_config_without_location(self): } ) + @patch('lti_consumer.api.get_external_config_from_filter') + def test_launch_info_for_lti_config_with_external_configuration( + self, + filter_mock, + ): + """ + Check if the API can return launch info for LtiConfiguration using + an external configuration. + """ + external_id = 'test-app:test-slug' + external_config = {'lti_1p3_client_id': 'test-client-id'} + filter_mock.return_value = external_config + lti_config = LtiConfiguration.objects.create( + version=LtiConfiguration.LTI_1P3, + config_id=_test_config_id, + config_store=LtiConfiguration.CONFIG_EXTERNAL, + external_id=external_id, + ) + + launch_info = get_lti_1p3_launch_info(self._get_lti_1p3_launch_data()) + + self.assertEqual( + launch_info, + { + 'client_id': external_config.get('lti_1p3_client_id'), + 'keyset_url': 'https://example.com/api/lti_consumer/v1/public_keysets/{}'.format( + lti_config.external_id.replace(':', '/'), + ), + 'deployment_id': external_config.get('lti_1p3_deployment_id', '1'), + 'oidc_callback': 'https://example.com/api/lti_consumer/v1/launch/', + 'token_url': 'https://example.com/api/lti_consumer/v1/token/{}'.format( + lti_config.external_id.replace(':', '/'), + ), + 'deep_linking_launch_url': 'http://example.com', + 'deep_linking_content_items': None, + }, + ) + filter_mock.assert_called_once_with({}, external_id) + class TestGetLti1p3LaunchUrl(Lti1P3TestCase): """ diff --git a/lti_consumer/tests/unit/test_lti_xblock.py b/lti_consumer/tests/unit/test_lti_xblock.py index 8ea143b6..c088b012 100644 --- a/lti_consumer/tests/unit/test_lti_xblock.py +++ b/lti_consumer/tests/unit/test_lti_xblock.py @@ -1,9 +1,9 @@ """ Unit tests for LtiConsumerXBlock """ - import json import logging +import string from datetime import timedelta from itertools import product from unittest.mock import Mock, PropertyMock, patch @@ -183,6 +183,73 @@ def test_validate_with_invalid_custom_parameters(self, custom_parameters, add_mo mock_validation_message('error', 'Custom Parameters should be strings in "x=y" format.'), ) + def test_validate_external_config_with_external_config_type(self): + """Test external config ID with external config type.""" + slug = f'{string.digits}{string.ascii_letters}_-' + self.xblock.config_type = 'external' + self.xblock.external_config = f'{slug}:{slug}' + + validation = self.xblock.validate() + + self.assertEqual(validation.messages, []) + + @ddt.data('new', 'database') + def test_validate_external_config_without_external_config_type( + self, + config_type, + ): + """Test with external config ID without using an external config type.""" + self.xblock.config_type = config_type + self.xblock.external_config = None + + validation = self.xblock.validate() + + self.assertEqual(validation.messages, []) + + @patch('lti_consumer.lti_xblock.ValidationMessage') + @patch.object(Validation, 'add') + def test_validate_empty_external_config_with_external_config_type( + self, + add_mock, + mock_validation_message, + ): + """Test with empty external config ID using an external config type.""" + mock_validation_message.ERROR.return_value = 'error' + self.xblock.config_type = 'external' + self.xblock.external_config = None + + self.xblock.validate() + + add_mock.assert_called_once_with( + mock_validation_message( + 'error', + 'Reusable configuration ID must be set when using external config (Example: "x:y").', + ), + ) + + @ddt.data('apptest', 'app:', ':test', 'app::test', f'{string.punctuation}:{string.punctuation}') + @patch('lti_consumer.lti_xblock.ValidationMessage') + @patch.object(Validation, 'add') + def test_validate_invalid_external_config_with_external_config_type( + self, + external_id, + add_mock, + mock_validation_message, + ): + """Test with invalid external config ID using an external config type.""" + mock_validation_message.ERROR.return_value = 'error' + self.xblock.config_type = 'external' + self.xblock.external_config = external_id + + self.xblock.validate() + + add_mock.assert_called_once_with( + mock_validation_message( + 'error', + 'Reusable configuration ID must be set when using external config (Example: "x:y").', + ), + ) + @patch('lti_consumer.lti_xblock.LtiConsumerXBlock.course') def test_validate_lti_id(self, mock_course): """ diff --git a/lti_consumer/tests/unit/test_models.py b/lti_consumer/tests/unit/test_models.py index ac52e96c..78d10536 100644 --- a/lti_consumer/tests/unit/test_models.py +++ b/lti_consumer/tests/unit/test_models.py @@ -154,11 +154,17 @@ def test_repr(self): f"[CONFIG_ON_XBLOCK] lti_1p3 - {dummy_location}" ) - @ddt.data(LtiConfiguration.CONFIG_ON_XBLOCK, LtiConfiguration.CONFIG_ON_DB) - def test_lti_consumer_ags_enabled(self, config_store): + @ddt.data( + LtiConfiguration.CONFIG_ON_XBLOCK, + LtiConfiguration.CONFIG_ON_DB, + LtiConfiguration.CONFIG_EXTERNAL, + ) + @patch('lti_consumer.models.get_external_config_from_filter') + def test_lti_consumer_ags_enabled(self, config_store, filter_mock): """ Check if LTI AGS is properly included when block is graded. """ + filter_mock.return_value = {'lti_advantage_ags_mode': 'programmatic'} config = self._get_1p3_config( config_store=config_store, lti_advantage_ags_mode='programmatic' @@ -186,28 +192,32 @@ def test_lti_consumer_ags_enabled(self, config_store): @ddt.data( {'config_store': LtiConfiguration.CONFIG_ON_XBLOCK, 'expected_value': 'XBlock'}, {'config_store': LtiConfiguration.CONFIG_ON_DB, 'expected_value': 'disabled'}, - {'config_store': LtiConfiguration.CONFIG_EXTERNAL, 'expected_value': None}, + {'config_store': LtiConfiguration.CONFIG_EXTERNAL, 'expected_value': 'external'}, ) @ddt.unpack - def test_get_lti_advantage_ags_mode(self, config_store, expected_value): + @patch('lti_consumer.models.get_external_config_from_filter') + def test_get_lti_advantage_ags_mode(self, filter_mock, config_store, expected_value): """ Check if LTI AGS is properly returned. """ + filter_mock.return_value = {'lti_advantage_ags_mode': 'external'} config = self._get_1p3_config(config_store=config_store, lti_advantage_ags_mode='disabled') self.xblock.lti_advantage_ags_mode = 'XBlock' - if config_store in (LtiConfiguration.CONFIG_ON_XBLOCK, LtiConfiguration.CONFIG_ON_DB): - self.assertEqual(config.get_lti_advantage_ags_mode(), expected_value) - else: - with self.assertRaises(NotImplementedError): - config.get_lti_advantage_ags_mode() + self.assertEqual(config.get_lti_advantage_ags_mode(), expected_value) - @ddt.data(LtiConfiguration.CONFIG_ON_XBLOCK, LtiConfiguration.CONFIG_ON_DB) - def test_lti_consumer_ags_declarative(self, config_store): + @ddt.data( + LtiConfiguration.CONFIG_ON_XBLOCK, + LtiConfiguration.CONFIG_ON_DB, + LtiConfiguration.CONFIG_EXTERNAL, + ) + @patch('lti_consumer.models.get_external_config_from_filter') + def test_lti_consumer_ags_declarative(self, config_store, filter_mock): """ Check that a LineItem is created if AGS is set to the declarative mode. """ + filter_mock.return_value = {'lti_advantage_ags_mode': 'declarative'} self.xblock.lti_advantage_ags_mode = 'declarative' # Include `start` and `due` dates @@ -236,11 +246,17 @@ def test_lti_consumer_ags_declarative(self, config_store): ags_claim.get('scope') ) - @ddt.data(LtiConfiguration.CONFIG_ON_XBLOCK, LtiConfiguration.CONFIG_ON_DB) - def test_lti_consumer_deep_linking_enabled(self, config_store): + @ddt.data( + LtiConfiguration.CONFIG_ON_XBLOCK, + LtiConfiguration.CONFIG_ON_DB, + LtiConfiguration.CONFIG_EXTERNAL, + ) + @patch('lti_consumer.models.get_external_config_from_filter') + def test_lti_consumer_deep_linking_enabled(self, config_store, filter_mock): """ Check if LTI DL is properly instanced when configured. """ + filter_mock.return_value = {'lti_advantage_deep_linking_enabled': True} config = self._get_1p3_config( config_store=config_store, lti_advantage_deep_linking_enabled=True @@ -255,62 +271,56 @@ def test_lti_consumer_deep_linking_enabled(self, config_store): @ddt.data( {'config_store': LtiConfiguration.CONFIG_ON_XBLOCK, 'expected_value': False}, {'config_store': LtiConfiguration.CONFIG_ON_DB, 'expected_value': True}, - {'config_store': LtiConfiguration.CONFIG_EXTERNAL, 'expected_value': None}, + {'config_store': LtiConfiguration.CONFIG_EXTERNAL, 'expected_value': True}, ) @ddt.unpack - def test_get_lti_advantage_deep_linking_enabled(self, config_store, expected_value): + @patch('lti_consumer.models.get_external_config_from_filter') + def test_get_lti_advantage_deep_linking_enabled(self, filter_mock, config_store, expected_value): """ Check if LTI Deep Linking enabled is properly returned. """ + filter_mock.return_value = {'lti_advantage_deep_linking_enabled': True} config = self._get_1p3_config(config_store=config_store, lti_advantage_deep_linking_enabled=True) self.xblock.lti_advantage_deep_linking_enabled = False - if config_store in (LtiConfiguration.CONFIG_ON_XBLOCK, LtiConfiguration.CONFIG_ON_DB): - self.assertEqual(config.get_lti_advantage_deep_linking_enabled(), expected_value) - else: - with self.assertRaises(NotImplementedError): - config.get_lti_advantage_deep_linking_enabled() + self.assertEqual(config.get_lti_advantage_deep_linking_enabled(), expected_value) @ddt.data( {'config_store': LtiConfiguration.CONFIG_ON_XBLOCK, 'expected_value': 'XBlock'}, {'config_store': LtiConfiguration.CONFIG_ON_DB, 'expected_value': 'database'}, - {'config_store': LtiConfiguration.CONFIG_EXTERNAL, 'expected_value': None}, + {'config_store': LtiConfiguration.CONFIG_EXTERNAL, 'expected_value': 'external'}, ) @ddt.unpack - def test_get_lti_advantage_deep_linking_launch_url(self, config_store, expected_value): + @patch('lti_consumer.models.get_external_config_from_filter') + def test_get_lti_advantage_deep_linking_launch_url(self, filter_mock, config_store, expected_value): """ Check if LTI Deep Linking launch URL is properly returned. """ + filter_mock.return_value = {'lti_advantage_deep_linking_launch_url': 'external'} config = self._get_1p3_config(config_store=config_store, lti_advantage_deep_linking_launch_url='database') self.xblock.lti_advantage_deep_linking_launch_url = 'XBlock' - if config_store in (LtiConfiguration.CONFIG_ON_XBLOCK, LtiConfiguration.CONFIG_ON_DB): - self.assertEqual(config.get_lti_advantage_deep_linking_launch_url(), expected_value) - else: - with self.assertRaises(NotImplementedError): - config.get_lti_advantage_deep_linking_launch_url() + self.assertEqual(config.get_lti_advantage_deep_linking_launch_url(), expected_value) @ddt.data( {'config_store': LtiConfiguration.CONFIG_ON_XBLOCK, 'expected_value': False}, {'config_store': LtiConfiguration.CONFIG_ON_DB, 'expected_value': True}, - {'config_store': LtiConfiguration.CONFIG_EXTERNAL, 'expected_value': None}, + {'config_store': LtiConfiguration.CONFIG_EXTERNAL, 'expected_value': True}, ) @ddt.unpack - def test_get_lti_advantage_nrps_enabled(self, config_store, expected_value): + @patch('lti_consumer.models.get_external_config_from_filter') + def test_get_lti_advantage_nrps_enabled(self, filter_mock, config_store, expected_value): """ Check if LTI Deep Linking launch URL is properly returned. """ + filter_mock.return_value = {'lti_advantage_enable_nrps': True} config = self._get_1p3_config(config_store=config_store, lti_advantage_enable_nrps=True) self.xblock.lti_advantage_enable_nrps = False - if config_store in (LtiConfiguration.CONFIG_ON_XBLOCK, LtiConfiguration.CONFIG_ON_DB): - self.assertEqual(config.get_lti_advantage_nrps_enabled(), expected_value) - else: - with self.assertRaises(NotImplementedError): - config.get_lti_advantage_nrps_enabled() + self.assertEqual(config.get_lti_advantage_nrps_enabled(), expected_value) def test_generate_private_key(self): """ @@ -358,6 +368,17 @@ def test_clean(self): self.lti_1p3_config.config_store = self.lti_1p3_config.CONFIG_ON_XBLOCK self.lti_1p3_config.location = None + with self.assertRaises(ValidationError): + self.lti_1p3_config.clean() + + self.lti_1p3_config.config_store = self.lti_1p3_config.CONFIG_EXTERNAL + self.lti_1p3_config.external_id = None + + with self.assertRaises(ValidationError): + self.lti_1p3_config.clean() + + self.lti_1p3_config.external_id = 'invalid-external-id' + with self.assertRaises(ValidationError): self.lti_1p3_config.clean() @@ -370,19 +391,13 @@ def test_clean(self): self.lti_1p3_config_db.clean() self.lti_1p3_config.lti_1p3_proctoring_enabled = True + self.lti_1p3_config.external_id = 'test_id' for config_store in [self.lti_1p3_config.CONFIG_ON_XBLOCK, self.lti_1p3_config.CONFIG_EXTERNAL]: self.lti_1p3_config.config_store = config_store with self.assertRaises(ValidationError): self.lti_1p3_config.clean() - def test_get_redirect_uris_raises_error_for_external_config(self): - """ - An external config raises NotImplementedError - """ - with self.assertRaises(NotImplementedError): - self.lti_1p3_config_external.get_lti_1p3_redirect_uris() - @ddt.data( (LAUNCH_URL, DEEP_LINK_URL, [], [LAUNCH_URL, DEEP_LINK_URL]), (LAUNCH_URL, DEEP_LINK_URL, ["http://other.url"], ["http://other.url"]), @@ -416,6 +431,31 @@ def test_get_redirect_uris_for_db_model_returns_expected( assert self.lti_1p3_config_db.get_lti_1p3_redirect_uris() == expected + @patch('lti_consumer.models.choose_lti_1p3_redirect_uris', return_value=None) + @patch('lti_consumer.models.get_external_config_from_filter') + def test_get_redirect_uris_with_external_config( + self, + get_external_config_from_filter_mock, + choose_lti_1p3_redirect_uris, + ): + """ + Test get_redirect_uris with external configuration. + """ + external_config = { + 'lti_1p3_redirect_uris': ['external-redirect-uris'], + 'lti_1p3_launch_url': LAUNCH_URL, + 'lti_advantage_deep_linking_launch_url': DEEP_LINK_URL, + } + get_external_config_from_filter_mock.return_value = external_config + + self.assertEqual(self.lti_1p3_config_external.get_lti_1p3_redirect_uris(), None) + get_external_config_from_filter_mock.assert_called_once_with({}, self.lti_1p3_config_external.external_id) + choose_lti_1p3_redirect_uris.assert_called_once_with( + external_config['lti_1p3_redirect_uris'], + external_config['lti_1p3_launch_url'], + external_config['lti_advantage_deep_linking_launch_url'], + ) + @patch.object(LtiConfiguration, 'sync_configurations') def test_save(self, sync_configurations_mock): """Test save method.""" @@ -445,7 +485,10 @@ def test_sync_configurations_with_ccx_location( call(location=self.lti_1p3_config.location.to_block_locator()), call().first(), ]) - model_to_dict_mock.assert_called_once_with(filter_mock.return_value.first(), ['id', 'config_id', 'location']) + model_to_dict_mock.assert_called_once_with( + filter_mock.return_value.first(), + ['id', 'config_id', 'location', 'external_config'], + ) setattr_mock.assert_called_once_with(self.lti_1p3_config, 'test', 'test') @patch('lti_consumer.models.isinstance', return_value=False) @@ -468,7 +511,10 @@ def test_sync_configurations_with_location( call().filter().exclude(id=self.lti_1p3_config.pk), call().filter().exclude().update(**model_to_dict_mock), ]) - model_to_dict_mock.assert_called_once_with(self.lti_1p3_config, ['id', 'config_id', 'location']) + model_to_dict_mock.assert_called_once_with( + self.lti_1p3_config, + ['id', 'config_id', 'location', 'external_config'], + ) @patch('lti_consumer.models.isinstance', return_value=False) @patch.object(LtiConfiguration.objects, 'filter', side_effect=IndexError()) diff --git a/lti_consumer/utils.py b/lti_consumer/utils.py index 192c2bc1..a6acbd4d 100644 --- a/lti_consumer/utils.py +++ b/lti_consumer/utils.py @@ -3,6 +3,7 @@ """ import copy import logging +import re from importlib import import_module from urllib.parse import urlencode @@ -19,6 +20,9 @@ log = logging.getLogger(__name__) +SLUG_CHARACTER_CLASS = '[-a-zA-Z0-9_]' +EXTERNAL_ID_REGEX = re.compile(rf'^({SLUG_CHARACTER_CLASS}+:{SLUG_CHARACTER_CLASS}+)$') + def _(text): """