Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative implementation #2

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions example/api_rate_limit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

from botdetection import RequestContext, RequestInfo, too_many_requests

import flask
import werkzeug


API_WINDOW = 3600
"""Time (sec) before sliding window for API requests (format != html) expires."""

API_MAX = 4
"""Maximum requests from one IP in the :py:obj:`API_WINDOW`"""


def api_rate_filter_request(
context: RequestContext,
request_info: RequestInfo,
request: flask.Request,
) -> werkzeug.Response | None:
if request.args.get("format", "html") != "html":
c = context.redislib.incr_sliding_window("ip_limit.API_WINDOW:" + request_info.network.compressed, API_WINDOW)
if c > API_MAX:
return too_many_requests(request_info, "too many request in API_WINDOW")
101 changes: 101 additions & 0 deletions example/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import os
import logging
import tomllib

from redis import Redis
from flask import Flask, render_template, request
from botdetection import install_botdetection, RouteFilter, Config, PredefinedRequestFilter

from api_rate_limit import api_rate_filter_request


app = Flask("botdetection demo")
logger = logging.getLogger(__name__)


# Registering the middleware
def get_config() -> Config:
config_raw = {}
try:
with open("config.toml", "rb") as f:
config_raw = tomllib.load(f)
except IOError:
print("Error loading config.toml")
pass
return Config(**config_raw)


if os.getenv("REDIS", "1") == "1":
redis = Redis.from_url("redis://localhost:6379/0")
else:
redis = None


route_filter = RouteFilter(
{
"/healthz": [],
"/search": [
PredefinedRequestFilter.HTTP_ACCEPT,
PredefinedRequestFilter.HTTP_ACCEPT_ENCODING,
PredefinedRequestFilter.HTTP_ACCEPT_LANGUAGE,
PredefinedRequestFilter.HTTP_USER_AGENT,
api_rate_filter_request,
PredefinedRequestFilter.IP_LIMIT,
],
"*": [
PredefinedRequestFilter.HTTP_USER_AGENT,
],
}
)


if not os.getenv("BOTDETECTION", "1") == "0":
logger.warning("botdetection is installed")
install_botdetection(app, redis, get_config(), route_filter)
else:
logger.warning("botdetection is NOT installed")


@app.route("/")
def index():
# no need to specify the link_token variable:
# install_botdetection makes sure it is set in the template

# get the real_ip if botdetection is enabled
botdetection_enabled = False
link_token = False

botdetection_context = getattr(request, "botdetection_context", None)
if botdetection_context:
ip = request.botdetection_request_info.real_ip
botdetection_enabled = True
link_token = botdetection_context.link_token is not None
else:
ip = request.remote_addr

return render_template(
"index.html",
ip = ip,
botdetection_enabled = botdetection_enabled,
link_token = link_token,
)


@app.route("/search")
def search():
return {
"results": [
"aa",
"bb",
"cc",
]
}


@app.route("/healthz")
def healthz():
return {"status": "ok"}


if __name__ == "__main__":
app.run()
16 changes: 16 additions & 0 deletions example/templates/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<!DOCTYPE html>
<html>
<head>
<title>Title</title>
<!-- botdetection_html_header is provided by the middleware -->
{%- if botdetection_html_header is defined -%}
{{ botdetection_html_header() | safe }}
{%- endif -%}
</head>
<body>
<h1>Demo of the botdetection</h1>
<p>Client IP address: {{ ip }}</p>
<p>BotDection: {% if botdetection_enabled %}enable{% else %}disable{% endif %}</p>
<p>Link Token: {% if link_token %}enable{% else %}disable{% endif %}</p>
</body>
</html>
9 changes: 5 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ requires-python = ">=3.8"
license = {file = "LICENSE"}
keywords = ["botdetection", "flask", "SearXNG"]
authors = [
{name = "Markus Heiser", email = "[email protected]" }
{name = "Markus Heiser", email = "[email protected]" },
{name = "Alexandre Flament", email = "[email protected]" }
]
maintainers = [
{name = "Markus Heiser", email = "[email protected]" }
{name = "Markus Heiser", email = "[email protected]" },
{name = "Alexandre Flament", email = "[email protected]" }
]

classifiers = [
Expand All @@ -27,9 +29,8 @@ classifiers = [

dependencies = [
"flask",
"pytomlpp",
"redis",
"typing_extensions",
"pydantic>=2.6.3",
# "click",
# "pydnsbl",
# "netaddr",
Expand Down
57 changes: 24 additions & 33 deletions src/botdetection/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,35 @@
Implementations used for bot detection.

"""
from __future__ import annotations
from logging import getLogger

from dataclasses import dataclass
import pathlib
from flask import Flask
from redis import Redis

import redis
from .config import Config
from botdetection.config import Config
from botdetection._redislib import RedisLib
from botdetection._request_info import RequestInfo
from botdetection._botdetection import BotDetection
from botdetection.request_filters import RequestFilter, PredefinedRequestFilter, RouteFilter
from botdetection._helpers import too_many_requests
from botdetection._request_context import RequestContext

from ._helpers import logger
from ._helpers import dump_request
from ._helpers import get_real_ip
from ._helpers import get_network
from ._helpers import too_many_requests

logger = logger.getChild('init')
__all__ = [
"install_botdetection",
"too_many_requests",
"Config",
"RequestContext",
"RequestInfo",
"RequestFilter",
"PredefinedRequestFilter",
"RouteFilter",
"RedisLib",
]

__all__ = ['dump_request', 'get_network', 'get_real_ip', 'too_many_requests']

CFG_SCHEMA = pathlib.Path(__file__).parent / "schema.toml"
"""Base configuration (schema) of the botdetection."""
logger = getLogger(__name__)

CFG_DEPRECATED = {
# "dummy.old.foo": "config 'dummy.old.foo' exists only for tests. Don't use it in your real project config."
}


@dataclass
class Context:
"""A global context of the botdetection"""

# pylint: disable=too-few-public-methods

redis_client: redis.Redis | None = None
cfg: Config = Config.from_toml(schema_file=CFG_SCHEMA, cfg_file=None, deprecated=CFG_DEPRECATED)

def init(self, toml_cfg: pathlib.Path, redis_client: redis.Redis | None):
self.redis_client = redis_client
self.cfg.load_toml(toml_cfg)


ctx = Context()
def install_botdetection(app: Flask, redis: Redis, config: Config, request_filter: RequestFilter):
app.botdetection = BotDetection(app, redis, config, request_filter)
106 changes: 106 additions & 0 deletions src/botdetection/_botdetection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint

import logging

from ipaddress import ip_address
from flask import Flask, Response, request, render_template_string, make_response

from redis import Redis

from botdetection import _ip_lists
from botdetection._helpers import get_network, get_real_ip
from botdetection.config import Config
from botdetection._redislib import RedisLib
from botdetection._request_info import RequestInfo
from botdetection._request_context import RequestContext
from botdetection.request_filters import RequestFilter
from botdetection._link_token import get_link_token


logger = logging.getLogger(__name__)


class BotDetection:
def __init__(self, app: Flask, redis: Redis, config: Config, request_filter: RequestFilter):
self.app = app
self.config = config
self.request_filter = request_filter
prefix = config.botdetection.redis.prefix
secret = config.botdetection.redis.secret_hash
self.redislib = RedisLib(redis, prefix, secret) if redis else None
self.register_jinja_globals()
self.register_endpoints()
self.register_before_request()

def register_before_request(self):
@self.app.before_request
def before_request():
real_ip = ip_address(get_real_ip(self.config, request))
network = get_network(self.config, real_ip)
request_info = RequestInfo(real_ip, network)

link_token = get_link_token(self.redislib, self.config, request_info, request)
context = RequestContext(self.config, self.redislib, link_token)

request.botdetection_context = context
request.botdetection_request_info = request_info

if request_info.network.is_link_local and not context.config.botdetection.ip_limit.filter_link_local:
logger.debug(
"network %s is link-local -> not monitored by ip_limit method",
request_info.network.compressed,
)
return None

# block- & pass- lists
#
# 1. The IP of the request is first checked against the pass-list; if the IP
# matches an entry in the list, the request is not blocked.
# 2. If no matching entry is found in the pass-list, then a check is made against
# the block list; if the IP matches an entry in the list, the request is
# blocked.
# 3. If the IP is not in either list, the request is not blocked.
match, msg = _ip_lists.pass_ip(request_info.real_ip, self.config)
if match:
logger.warning("PASS %s: matched PASSLIST - %s", request_info.network.compressed, msg)
return None

match, msg = _ip_lists.block_ip(request_info.real_ip, self.config)
if match:
logger.error("BLOCK %s: matched BLOCKLIST - %s", request_info.network.compressed, msg)
return make_response(("IP is on BLOCKLIST - %s" % msg, 429))

# apply the filter(s)
response = self.request_filter(context, request_info, request)
if response is not None:
return response

# the request is accepted
return None

def register_jinja_globals(self):
template_string = """
<link rel="stylesheet" href="{{ url_for('client_token', token=link_token) }}" type="text/css" />
"""

@self.app.context_processor
def inject_bot_detector():
def botdetection_html_header():
link_token = request.botdetection_context.link_token
if link_token is None:
# no link token
return ""
# link_token is initialized
token = link_token.get_token()
html = render_template_string(template_string, link_token=token)
# find the equivalent of flask.Markup and use it
return html

return {"botdetection_html_header": botdetection_html_header}

def register_endpoints(self):
@self.app.route("/client<token>.css", methods=["GET"])
def client_token(token=None):
request.botdetection_context.link_token.ping(token)
return Response("", mimetype="text/css")
Loading