Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add support for session tokens from UI Extensions (Checkout) #656

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/session-tokens.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Session tokens

The Shopify Python API library provides helper methods to decode [session tokens](https://shopify.dev/concepts/apps/building-embedded-apps-using-session-tokens). You can use the `decode_from_header` function to extract and decode a session token from an HTTP Authorization header.
The Shopify Python API library provides helper methods to decode [session tokens](https://shopify.dev/concepts/apps/building-embedded-apps-using-session-tokens). You can use the `decode_from_header` function to extract and decode a session token from an HTTP Authorization header (it can be from a UI Extension or an embedded app).

## Basic usage

Expand All @@ -11,6 +11,7 @@ decoded_payload = session_token.decode_from_header(
authorization_header=your_auth_request_header,
api_key=your_api_key,
secret=your_api_secret,
is_extension=True_or_False
)
```

Expand All @@ -29,7 +30,8 @@ def session_token_required(func):
decoded_session_token = session_token.decode_from_header(
authorization_header = request.headers.get('Authorization'),
api_key = SHOPIFY_API_KEY,
secret = SHOPIFY_API_SECRET
secret = SHOPIFY_API_SECRET,
is_extension=False
)
with shopify_session(decoded_session_token):
return func(*args, **kwargs)
Expand Down
13 changes: 8 additions & 5 deletions shopify/session_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
ALGORITHM = "HS256"
PREFIX = "Bearer "
REQUIRED_FIELDS = ["iss", "dest", "sub", "jti", "sid"]
EXTENSION_REQUIRED_FIELDS = ["aud", "dest", "jti", "exp", "nbf", "iat"]
LEEWAY_SECONDS = 10


Expand All @@ -33,10 +34,11 @@ class TokenAuthenticationError(SessionTokenError):
pass


def decode_from_header(authorization_header, api_key, secret):
def decode_from_header(authorization_header, api_key, secret, is_extension=False):
session_token = _extract_session_token(authorization_header)
decoded_payload = _decode_session_token(session_token, api_key, secret)
_validate_issuer(decoded_payload)
decoded_payload = _decode_session_token(session_token, api_key, secret, is_extension)
# skip validation for tokens coming from ui-extensions
_validate_issuer(decoded_payload) if not is_extension else None

return decoded_payload

Expand All @@ -48,7 +50,8 @@ def _extract_session_token(authorization_header):
return authorization_header[len(PREFIX) :]


def _decode_session_token(session_token, api_key, secret):
def _decode_session_token(session_token, api_key, secret, is_extension):
required_fields = EXTENSION_REQUIRED_FIELDS if is_extension else REQUIRED_FIELDS
try:
return jwt.decode(
session_token,
Expand All @@ -58,7 +61,7 @@ def _decode_session_token(session_token, api_key, secret):
# AppBridge frequently sends future `nbf`, and it causes `ImmatureSignatureError`.
# Accept few seconds clock skew to avoid this error.
leeway=LEEWAY_SECONDS,
options={"require": REQUIRED_FIELDS},
options={"require": required_fields},
)
except jwt.exceptions.PyJWTError as exception:
six.raise_from(SessionTokenError(str(exception)), exception)
Expand Down
2 changes: 1 addition & 1 deletion test/session_token_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_raises_if_aud_doesnt_match_api_key(self):
with self.assertRaises(session_token.SessionTokenError) as cm:
session_token.decode_from_header(self.build_auth_header(), api_key=self.api_key, secret=self.secret)

self.assertEqual("Invalid audience", str(cm.exception))
self.assertEqual("Audience doesn't match", str(cm.exception))

def test_raises_if_issuer_hostname_is_invalid(self):
self.payload["iss"] = "bad_shop_hostname"
Expand Down
57 changes: 57 additions & 0 deletions test/ui_extension_access_token_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from shopify import session_token
from test.test_helper import TestCase
from datetime import datetime, timedelta

import jwt
import sys

if sys.version_info[0] < 3: # Backwards compatibility for python < v3.0.0
import time


def timestamp(date):
return time.mktime(date.timetuple()) if sys.version_info[0] < 3 else date.timestamp()


class UIExtensionAccessTokenTest(TestCase):
@classmethod
def setUpClass(cls):
cls.secret = "API Secret"
cls.api_key = "API key"

@classmethod
def setUp(cls):
current_time = datetime.now()
cls.payload = {
"dest": "https://test-shop.myshopify.com",
"aud": cls.api_key,
"exp": timestamp((current_time + timedelta(0, 60))),
"nbf": timestamp(current_time),
"iat": timestamp(current_time),
"jti": "6c992878-dbaf-48d1-bb9d-6d9b59814fd1",
}

@classmethod
def build_auth_header(cls):
mock_session_token = jwt.encode(cls.payload, cls.secret, algorithm="HS256")
return "Bearer {session_token}".format(session_token=mock_session_token)

def test_raises_if_token_authentication_header_is_not_bearer(self):
authorization_header = "Bad auth header"

with self.assertRaises(session_token.TokenAuthenticationError) as cm:
session_token.decode_from_header(
authorization_header, api_key=self.api_key, secret=self.secret, is_extension=True
)

self.assertEqual("The HTTP_AUTHORIZATION_HEADER provided does not contain a Bearer token", str(cm.exception))

def test_raises_extension_is_false_and_invalid_payload(self):
authorization_header = self.build_auth_header()

with self.assertRaises(session_token.SessionTokenError) as cm:
session_token.decode_from_header(
authorization_header, api_key=self.api_key, secret=self.secret, is_extension=False
)

self.assertEqual('Token is missing the "iss" claim', str(cm.exception))