Skip to content

Commit

Permalink
SimpleTokenPlugin and JwtTokenPlugin
Browse files Browse the repository at this point in the history
  • Loading branch information
mdipierro committed Jan 12, 2025
1 parent 825ca4b commit 75f596b
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 23 deletions.
16 changes: 16 additions & 0 deletions apps/_scaffold/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,22 @@
)
)

# #######################################################
# Enable optional API token plugins
# #######################################################

# curl -H "Authorization: Bearer {token}"
# create tokens in db.auth_simple_token
#
# simple_token_plugin = SimpleTokenPlugin(auth)
# auth.token_plugins.append(simple_token_plugin)

# curl -H "Authorization: Bearer {token}"
# create tokens with JwtTokenPlugin(auth).make(user, expiration)
#
# jwt_token_plugin = JwtTokenPlugin(auth)
# auth.token_plugins.append(jwt_token_plugin)

# #######################################################
# Define a convenience action to allow users to download
# files uploaded and reference by Field(type='upload')
Expand Down
172 changes: 149 additions & 23 deletions py4web/utils/auth.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
import base64
import calendar
import copy
import datetime
import hashlib
import random
import re
import time
import uuid

from pydal.validators import (CRYPT, IS_EMAIL, IS_EQUAL_TO, IS_MATCH,
IS_NOT_EMPTY, IS_NOT_IN_DB, IS_STRONG)
import jwt

from pydal.validators import (
CRYPT,
IS_EMAIL,
IS_EQUAL_TO,
IS_MATCH,
IS_NOT_EMPTY,
IS_NOT_IN_DB,
IS_STRONG,
)
from yatl.helpers import DIV, A

from py4web import HTTP, URL, Field, action, redirect, request, response
from py4web.core import (REGEX_APPJSON, Fixture, Flash, Template, Translator,
utcnow)
from py4web.core import REGEX_APPJSON, Fixture, Flash, Template, Translator, utcnow
from py4web.utils.form import Form, FormStyleDefault
from py4web.utils.param import Param

Expand Down Expand Up @@ -110,14 +117,11 @@ def goto_login(self, message=""):
def on_request(self, context):
"""Checks that we have a user in the session and
the condition is met"""
if "user" not in self.auth.session or "id" not in self.auth.session["user"]:
self.auth.session["recent_activity"] = None
self.goto_login(message="User not logged in")

self.auth.on_request(context)

if "user" not in self.auth.session or "id" not in self.auth.session["user"]:
self.goto_login(message="Login expired")
self.goto_login(message="Login required")

if callable(self.condition) and not self.condition(user):
self.abort_or_redirect("not-authorized", "User not authorized")

Expand Down Expand Up @@ -278,27 +282,47 @@ def __init__(
self._link = None
if db and define_tables:
self.define_tables()
# these are plugins like SAML, OAuth2, etc which delegate authentication
self.plugins = {}
# these are plugins that can retrieve an existing user from a given token
self.token_plugins = []
self.form_source = DefaultAuthForms(self)
self.fix_actions()

def on_request(self, context):
"""Checks that we have a user in the session and
the condition is met"""

# check if a valid user in session
user = self.session.get("user")
if user:
activity = self.session.get("recent_activity")
time_now = calendar.timegm(time.gmtime())
# enforce the optionl auth session expiration time
if (
self.param.login_expiration_time
and activity
and time_now - activity > self.param.login_expiration_time
):
del self.session["user"]
elif not activity or time_now - activity > 6:
self.session["recent_activity"] = time_now
self.session["recent_timestamp"] = utcnow().isoformat()

# else try if this is an API call with token
is_token = False
if not user:
for plugin in self.token_plugins:
user = plugin.get_user()
if user:
is_token = True
break

if not is_token:
# handle the case without a token
if user:
activity = self.session.get("recent_activity")
time_now = calendar.timegm(time.gmtime())
# enforce the optionl auth session expiration time
if "id" not in user or (
self.param.login_expiration_time
and activity
and time_now - activity > self.param.login_expiration_time
):
del self.session["user"]
elif not activity or time_now - activity > 6:
self.session["recent_activity"] = time_now
self.session["recent_timestamp"] = utcnow().isoformat()
else:
# handle the case with a token, no checks required
self.session["user"] = user

def allows(self, action_name):
"""Checks if the provided action is allowed on the Auth object"""
Expand Down Expand Up @@ -2035,3 +2059,105 @@ def _postprocessing(self, action, form=None, user=None):
self.auth.on_accept[action](form, user)
if not form or form.accepted:
redirect(self.auth.session.get(f"_next_{action}") or URL("index"))


class SimpleTokenPlugin:
"""
A simble token bearer handler for auth.token_plugins
Usage:
1) auth.token_plugins.append(SimpleTokenPlugin(auth))
2) Use a grid or form to create a db.auth_simple_token record
3) Use with curl -H "Authorization: Bearer {token}"
This kind of token needs database access to be verified
It can therefore be expired server side.
This is the recommended kind of token.
You can create a management interface for API tokens with
@action("tokens/<path:path>")
@action.uses("generic.html", auth.user)
def _(path):
db.auth_simple_token.user_id.default = auth.user_id
grid = Grid(path, db.auth_simple_token.user_id==auth.user_id, create=True, deletable=True)
return dict(grid=grid)
"""

def __init__(self, auth):
self.auth = auth
auth.db.define_table(
"auth_simple_token",
Field("token", default=uuid.uuid4, unique=True, writable=False),
Field(
"user_id", "reference auth_user", default=auth.user_id, writable=False
),
Field("description"),
Field("expiration_date", "datetime"),
auth.signature,
)

def get_user(self):
authorization = request.headers.get("authorization")
if authorization and authorization.startswith("Bearer "):
db = self.auth.db
token = authorization[6:].strip()
row = db.auth_simple_token(token=token)
if row and row.expiration_date.isoformat() > utcnow().isoformat():
user = db.auth_user(row.user_id)
return user.as_dict()
return None


class JwtTokenPlugin:
"""
A simble token bearer handler for auth.token_plugins
Usage:
1) myplugin = JwtTokenPlugin(auth)
2) auth.token_plugins.append(myplugin)
3) Use token = myplugin.make(user, expiration) to make tokens
4) Use with curl -H "Authorization: Bearer {token}"
This kind of token does not need database access to be verified.
It cannot therefore be expired server side and any server
side change in the user info is not reflected in the info
stored in the token.
This token should only be used when the API needs to be fast
and it is desirable to aboid any database access.
Expiration should not be too far in the future.
When making a token you can put anything you want in user,
but there must be a user["id"] for authentication to succeed.
"""

def __init__(self, auth, secret=None, algorithms=["HS256", "RS256"]):
self.auth = auth
self.secret = secret or self.auth.session.SECRET
self.algorithms = algorithms

def make(self, user, expiration):
if not isinstance(user, dict):
user = user.as_dict()
if not isinstance(expiration, str):
expiration = expiration.isoformat()
data = {"user": user, "expiration": expiration}
token = jwt.encode(data, self.secret, algorithm=self.algorithms[0])
return token

def get_user(self):
authorization = request.headers.get("authorization")
if authorization and authorization.startswith("Bearer "):
token = authorization[6:].strip()
try:
data = jwt.decode(token, self.secret, algorithms=self.algorithms)
except Exception:
return None
expiration = data.get("expiration")
if expiration and expiration > utcnow().isoformat():
user = data.get("user")
return user
return None

0 comments on commit 75f596b

Please sign in to comment.