Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding ip restrictions to login process #849

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 80 additions & 3 deletions py4web/utils/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time
import urllib
import uuid
import json

from pydal.validators import (
CRYPT,
Expand Down Expand Up @@ -36,7 +37,75 @@
[ ] Lock account after x failed login attempts.
[ ] Force new password every x days.
"""

BLCOKED_IP_MAX_COUNT=5
# timeout for blocked IP: if BLOCKED_IP_TIMEOUT=0 blocking never expired
BLOCKED_IP_TIMEOUT=5*60 # reenable blocked ip after this time in seconds
FILE_BLCOKED_IP_COUNT="host_ip_blocked.txt"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look into core.py for DatabaseErrorLogger. It creates a global database .service/service.storage. Perhaps make auth create a new "web2py_managed_ips" in there with fields "ip, allowed, blocked, counter" and use those.

FILE_HOST_IP_DENY="host_ip_deny.txt"
FILE_HOST_IP_ALLOW="host_ip_allow.txt"
host_ip_allow=[]
host_ip_deny=[]
host_ip_blocked={}
try:
with open(FILE_HOST_IP_ALLOW, "r") as fp:
host_ip_allow = json.load(fp)
except:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pylint will not like this

pass
try:
with open(FILE_HOST_IP_DENY, "r") as fp:
host_ip_deny = json.load(fp)
except:
pass

try:
with open(FILE_BLCOKED_IP_COUNT, "r") as fp:
host_ip_blocked = json.load(fp)
except:
pass

def get_timestamp(): # returns timestamp in seconds from a specific time in the past
return int((datetime.datetime.utcnow()-datetime.datetime(1,1,1,1,1,1)).total_seconds())

def register_failed_login_ip(): # called if error in credentials
client_ip=request.remote_addr
blocked_ip=None
if host_ip_blocked:
blocked_ip = host_ip_blocked[client_ip]
else: # start new registration with timestamp in seconds
host_ip_blocked[client_ip]={'count':0,'last_failed':get_timestamp()}
blocked_ip = host_ip_blocked[client_ip]
blocked_ip['last_failed']=get_timestamp() # update with the last failed time
print(request.path)
if blocked_ip['count']< BLCOKED_IP_MAX_COUNT:
blocked_ip['count']+=1
with open(FILE_BLCOKED_IP_COUNT, "w") as fp:
json.dump(host_ip_blocked, fp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately this is not thread safe.


# called after successfule credentials validation to check if not blocked on too many attempts
# or black or white listed
def ip_allow():
client_ip=request.remote_addr
is_this_ip_allowd=True
if host_ip_blocked:
blocked_ip =host_ip_blocked[client_ip]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice. Even if reading from DB make sure all blocked ips are read at statup to minimize disk IO at every request.

if blocked_ip: #this ip is found in the blocked list
if blocked_ip['count']>=BLCOKED_IP_MAX_COUNT: # check the failed login count for this ip
if get_timestamp()-blocked_ip['last_failed']>BLOCKED_IP_TIMEOUT: #however if timeout from first fail, then reset the counter and timestamp.
del host_ip_blocked[client_ip]
with open(FILE_BLCOKED_IP_COUNT, "w") as fp:
json.dump(host_ip_blocked, fp)
is_this_ip_allowd = True
else:
is_this_ip_allowd=False
if host_ip_allow: #check if allowed list exist
if not( client_ip in host_ip_allow):# check if listed as allowed ip
is_this_ip_allowd=False
if host_ip_deny: #check if deny list exist.This will override the allow list if exist
if client_ip in host_ip_deny: # check if listed as denyed ip
is_this_ip_allowd=False


return is_this_ip_allowd

def b16e(text):
"""convert unicode to b16 unicode"""
Expand Down Expand Up @@ -198,6 +267,7 @@ class Auth(Fixture):
"account_is_blocked": "Account is blocked",
"account_needs_to_be_approved": "Account needs to be approved",
"invalid_credentials": "Invalid Credentials",
"invalid_ip": "Invalid credentials",
"invalid_token": "invalid token",
"password_doesnt_match": "Password doesn't match",
"invalid_current_password": "invalid current password",
Expand Down Expand Up @@ -657,10 +727,15 @@ def login(self, email, password):
user = db(field == value).select().first()
if user and not (CRYPT()(password)[0] == user.password):
user = None

err_msg = "invalid_credentials"
if user:
if not ip_allow():
user = None
err_msg="invalid_ip" ## this woill display Invalid credentials insted od invalid Credentials (lower case c)
# then check for possible login blockers

if not user:
error = "invalid_credentials"
error = err_msg
elif (user["action_token"] or "").startswith("pending-registration:"):
error = "registration_is_pending"
elif user["action_token"] == "account-blocked":
Expand All @@ -670,6 +745,7 @@ def login(self, email, password):

# return the error or the user
if error:
register_failed_login_ip()
return (None, self.param.messages["errors"].get(error, error))
return (user, None)

Expand Down Expand Up @@ -1544,6 +1620,7 @@ def login_buttons(self):
]
)


return dict(buttons=top_buttons, combined_div=combined_div)

def login(self, model=False):
Expand Down