-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Task: Add CSRF for Pylons and Flask. #112
base: ckan_2.8
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
"""Provides a filter to prevent Cross-Site Request Forgery, | ||
based on the Double Submit Cookie pattern, | ||
https://github.com/OWASP/CheatSheetSeries/blob/master/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.md#double-submit-cookie | ||
This is integrated in the CSRFMiddleware | ||
""" | ||
import ckan.lib.base as base | ||
import re | ||
from re import IGNORECASE, MULTILINE | ||
from logging import getLogger | ||
from ckan.common import request | ||
|
||
LOG = getLogger(__name__) | ||
|
||
RAW_RENDER = base.render | ||
RAW_RENDER_JINJA = base.render_jinja2 | ||
RAW_BEFORE = base.BaseController.__before__ | ||
|
||
""" Used as the cookie name and input field name. | ||
""" | ||
TOKEN_FIELD_NAME = 'token' | ||
""" Used to rotate the token cookie periodically. | ||
If the freshness cookie doesn't appear, the token cookie is still OK, | ||
but we'll set a new one for next time. | ||
""" | ||
TOKEN_FRESHNESS_COOKIE_NAME = 'token-fresh' | ||
|
||
# We need to edit confirm-action links, which get intercepted by JavaScript, | ||
#regardless of which order their 'data-module' and 'href' attributes appear. | ||
CONFIRM_LINK = re.compile(r'(<a [^>]*data-module=["\']confirm-action["\'][^>]*href=["\']([^"\']+))(["\'])', IGNORECASE | MULTILINE) | ||
CONFIRM_LINK_REVERSED = re.compile(r'(<a [^>]*href=["\']([^"\']+))(["\'][^>]*data-module=["\']confirm-action["\'])', IGNORECASE | MULTILINE) | ||
|
||
""" | ||
This will match a POST form that has whitespace after the opening tag (which all existing forms do). | ||
Once we have injected a token immediately after the opening tag, | ||
it won't match any more, which avoids redundant injection. | ||
""" | ||
POST_FORM = re.compile(r'(<form [^>]*method=["\']post["\'][^>]*>)([^<]*\s<)', IGNORECASE | MULTILINE) | ||
|
||
"""The format of the token HTML field. | ||
""" | ||
HEX_PATTERN=re.compile(r'^[0-9a-z]+$') | ||
TOKEN_PATTERN = r'<input type="hidden" name="' + TOKEN_FIELD_NAME + '" value="{token}"/>' | ||
TOKEN_SEARCH_PATTERN = re.compile(TOKEN_PATTERN.format(token=r'([0-9a-f]+)')) | ||
API_URL = re.compile(r'^/api\b.*') | ||
|
||
|
||
def apply_token(html, token): | ||
""" Rewrite HTML to insert tokens if applicable. | ||
""" | ||
|
||
token_match = TOKEN_SEARCH_PATTERN.search(html) | ||
if token_match: | ||
token = token_match.group(1) | ||
|
||
def insert_form_token(form_match): | ||
return form_match.group(1) + TOKEN_PATTERN.format(token=token) + form_match.group(2) | ||
|
||
def insert_link_token(link_match): | ||
if '?' in link_match.group(2): | ||
separator = '&' | ||
else: | ||
separator = '?' | ||
return link_match.group(1) + separator + TOKEN_FIELD_NAME + '=' + token + link_match.group(3) | ||
|
||
return CONFIRM_LINK_REVERSED.sub(insert_link_token, CONFIRM_LINK.sub(insert_link_token, POST_FORM.sub(insert_form_token, html))) | ||
|
||
|
||
def get_cookie_token(request): | ||
"""Retrieve the token expected by the server. | ||
This will be retrieved from the 'token' cookie, if it exists. | ||
If not, an error will occur. | ||
""" | ||
token = None | ||
if request.cookies.has_key(TOKEN_FIELD_NAME): | ||
LOG.debug("Obtaining token from cookie") | ||
token = request.cookies.get(TOKEN_FIELD_NAME) | ||
if token is None or token.strip() == "": | ||
csrf_fail("CSRF token is blank") | ||
return token | ||
|
||
|
||
def get_response_token(request, response): | ||
"""Retrieve the token to be injected into pages. | ||
This will be retrieved from the 'token' cookie, if it exists and is fresh. | ||
If not, a new token will be generated and a new cookie set. | ||
""" | ||
# ensure that the same token is used when a page is assembled from pieces | ||
if TOKEN_FIELD_NAME in request.cookies and TOKEN_FRESHNESS_COOKIE_NAME in request.cookies: | ||
LOG.debug("Obtaining token from cookie") | ||
token = request.cookies.get(TOKEN_FIELD_NAME) | ||
if not HEX_PATTERN.match(token): | ||
LOG.debug("Invalid cookie token; making new token cookie") | ||
token = create_response_token(response) | ||
else: | ||
LOG.debug("No fresh token found; making new token cookie") | ||
token = create_response_token(response) | ||
return token | ||
|
||
|
||
def create_response_token(response): | ||
import binascii, os | ||
token = binascii.hexlify(os.urandom(32)) | ||
response.set_cookie(TOKEN_FIELD_NAME, token, secure=True, httponly=True) | ||
response.set_cookie(TOKEN_FRESHNESS_COOKIE_NAME, '1', max_age=600, secure=True, httponly=True) | ||
return token | ||
|
||
|
||
def csrf_fail(message): | ||
from flask import abort | ||
LOG.error(message) | ||
abort(403, "Your form submission could not be validated") | ||
|
||
|
||
def get_post_token(request): | ||
"""Retrieve the token provided by the client. | ||
This is normally a single 'token' parameter in the POST body. | ||
However, for compatibility with 'confirm-action' links, | ||
it is also acceptable to provide the token as a query string parameter, | ||
if there is no POST body. | ||
""" | ||
if request.environ['webob.adhoc_attrs'].has_key(TOKEN_FIELD_NAME): | ||
return request.token | ||
|
||
# handle query string token if there are no POST parameters | ||
# this is needed for the 'confirm-action' JavaScript module | ||
if not request.POST and len(request.GET.getall(TOKEN_FIELD_NAME)) == 1: | ||
request.token = request.GET.getone(TOKEN_FIELD_NAME) | ||
del request.GET[TOKEN_FIELD_NAME] | ||
return request.token | ||
|
||
postTokens = request.POST.getall(TOKEN_FIELD_NAME) | ||
if not postTokens: | ||
csrf_fail("Missing CSRF token in form submission") | ||
elif len(postTokens) > 1: | ||
csrf_fail("More than one CSRF token in form submission") | ||
else: | ||
request.token = postTokens[0] | ||
|
||
# drop token from request so it doesn't populate resource extras | ||
del request.POST[TOKEN_FIELD_NAME] | ||
|
||
return request.token | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
import anti_csrf | ||
import webob | ||
from webob.exc import HTTPForbidden | ||
|
||
from flask import Flask, make_response, render_template, request | ||
from ckan.common import config, is_flask_request | ||
import ckan.lib.base as base | ||
|
||
import logging | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
CSRF_ERR = 'CSRF authentication failed. Token missing or invalid.' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't used? |
||
|
||
domain = config.get('ckan.ontario_theme.csrf_domain', '') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Referrer checks should probably be optional, but if they're going to be implemented, shouldn't they be derived from |
||
|
||
|
||
|
||
def after_request_function(response): | ||
#request = Request(environ) | ||
#self.session = environ['beaker.session'] | ||
#self.session.save() | ||
#if is_valid(): | ||
resp = response | ||
#else: | ||
# return response | ||
if 'text/html' in resp.headers.get('Content-type', ''): | ||
token = anti_csrf.get_response_token(request, resp) | ||
new_response = anti_csrf.apply_token(resp.get_data(as_text=True), token) | ||
resp.set_data(new_response) | ||
return resp | ||
|
||
else: | ||
return response | ||
|
||
def is_valid():# | ||
return is_safe() or unsafe_request_is_valid() | ||
|
||
def unsafe_request_is_valid(): | ||
# return is_secure() and good_referer() and \ | ||
# good_origin() and check_token() | ||
return check_token() | ||
|
||
############################################################################## | ||
|
||
|
||
def is_secure(): | ||
# allow requests which have the x-forwarded-proto of https (inserted by nginx) | ||
if request.headers.get('X-Forwarded-Proto') == 'https': | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Headers can be forged. |
||
return True | ||
return request.scheme == 'https' | ||
|
||
def is_safe(): | ||
"Check if the request is 'safe', if the request is safe it will not be checked for csrf" | ||
# api requests are exempt from csrf checks | ||
if request.path.startswith("/api"): | ||
return True | ||
|
||
# get/head/options/trace are exempt from csrf checks | ||
return request.method in ('GET', 'HEAD', 'OPTIONS', 'TRACE') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TRACE should probably be entirely blocked, not given special treatment. It actually enables some types of CSRF filter bypass, by exposing the token cookie. |
||
|
||
def good_referer(): | ||
"Returns true if the referrer is https and matching the host" | ||
if not request.headers.get('Referer'): | ||
return False | ||
else: | ||
match = "https://{}".format(domain) | ||
return request.headers.get('Referer').startswith(match) | ||
|
||
def good_origin(): | ||
""" | ||
checks if the origin header is present and matches the header" | ||
:param domain: string: the expected origin domain | ||
:return: boolean: true if the origin header is present and matches the expected domain | ||
""" | ||
origin = request.headers.get('origin', None) | ||
if not origin: | ||
log.warning("Potentially unsafe CSRF request is missing the origin header") | ||
return True | ||
else: | ||
match = "https://{}".format(domain) | ||
return origin.startswith(match) | ||
|
||
def _get_post_token(): | ||
"""Retrieve the token provided by the client. Or return None if not present | ||
This is normally a single 'token' parameter in the POST body. | ||
However, for compatibility with 'confirm-action' links, | ||
it is also acceptable to provide the token as a query string parameter, | ||
if there is no POST body. | ||
""" | ||
if request.environ['webob.adhoc_attrs'].has_key(anti_csrf.TOKEN_FIELD_NAME): | ||
return request.token | ||
# handle query string token if there are no POST parameters | ||
# this is needed for the 'confirm-action' JavaScript module | ||
if not request.method == 'POST' and (request.args.get(anti_csrf.TOKEN_FIELD_NAME) and len(request.args.get(anti_csrf.TOKEN_FIELD_NAME)) == 1): | ||
token = request.args.get(TOKEN_FIELD_NAME) | ||
#del request.GET[anti_csrf.TOKEN_FIELD_NAME] | ||
return token | ||
post_tokens = request.form.getlist(anti_csrf.TOKEN_FIELD_NAME) | ||
if not post_tokens or len(post_tokens) != 1: | ||
return None | ||
token = post_tokens[0] | ||
# drop token from request so it doesn't populate resource extras | ||
#del request.POST[anti_csrf.TOKEN_FIELD_NAME] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this commented out? Does it not apply to Flask requests? |
||
return token | ||
|
||
def get_cookie_token(): | ||
"""Retrieve the token expected by the server. | ||
This will be retrieved from the 'token' cookie | ||
""" | ||
if anti_csrf.TOKEN_FIELD_NAME in request.cookies: | ||
log.debug("Obtaining token from cookie") | ||
return request.cookies.get(anti_csrf.TOKEN_FIELD_NAME) | ||
else: | ||
return None | ||
|
||
def check_token(): | ||
log.debug("Checking token matches Token {}, cookie_token: {}".format(_get_post_token(), get_cookie_token())) | ||
return _get_post_token() is not None and _get_post_token() == get_cookie_token() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this file really necessary when we have
orig_anti_csrf.py
?