diff --git a/py4web/core.py b/py4web/core.py index 541c6a2c..c4e32317 100644 --- a/py4web/core.py +++ b/py4web/core.py @@ -1,5 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +# pylint: disable=too-many-lines,line-too-long,too-many-branches,use-dict-literal,too-many-arguments,too-few-public-methods,too-many-locals,broad-exception-caught,cell-var-from-loop + """PY4WEB - a web framework for rapid development of efficient database driven web applications""" # Standard modules @@ -39,8 +41,6 @@ import portalocker from watchgod import awatch -from . import server_adapters - # Optional web servers for speed try: import gunicorn @@ -60,6 +60,7 @@ import threadsafevariable import yatl +from . import server_adapters from .utils.misc import secure_dumps, secure_loads bottle.DefaultConfig.max_memfile_size = 16 * 1024 * 1024 @@ -122,7 +123,7 @@ # hold all framework hooks in one place # NOTE: `after_request` hooks are not currently used -REQUEST_HOOKS = type("Object", (), dict(before=[])) +REQUEST_HOOKS = type("Object", (), dict(before=[])) # pylint: disable=invalid-name # set to true to debug issues with fixtures DEBUG = False @@ -165,7 +166,7 @@ def required_folder(*parts): path = os.path.join(*parts) if not os.path.exists(path): os.makedirs(path) - assert os.path.isdir(path), "%s is not a folder as required" % path + assert os.path.isdir(path), f"{path} is not a folder as required" return path @@ -178,7 +179,7 @@ def safely(func, exceptions=(Exception,), log=False, default=None): return func() except exceptions as err: if log: - logging.warn(str(err)) + logging.warning(str(err)) return default() if callable(default) else default @@ -188,7 +189,12 @@ def safely(func, exceptions=(Exception,), log=False, default=None): class Node: - def __init__(self, key=None, value=None, t=None, m=None, prev=None, next=None): + """A node for the LRU cache""" + + def __init__( + self, key=None, value=None, t=None, m=None, prev=None, next=None + ): # pylint: disable=redefined-builtin + """create a node of the LRU cache""" self.key, self.value, self.t, self.m, self.prev, self.next = ( key, value, @@ -213,6 +219,7 @@ class Cache: """ def __init__(self, size=1000): + """Create an LRU caching object""" self.free = size self.head = Node() self.tail = Node() @@ -263,10 +270,12 @@ def get(self, key, callback, expiration=3600, monitor=None): return value def memoize(self, expiration=3600): + """Decorator to memorize the output of any fuction""" + def decorator(func): @functools.wraps(func) def memoized_func(*args, **kwargs): - key = "%s:%s:%s:%s" % (func.__module__, func.__name__, args, kwargs) + key = f"{func.__module__}:{func.__name__}:{args}:{kwargs}" return self.get( key, lambda args=args, kwargs=kwargs: func(*args, **kwargs), @@ -283,31 +292,31 @@ def memoized_func(*args, **kwargs): ######################################################################################### -def objectify(obj): +def objectify(obj): # pylint: disable=too-many-return-statements """converts the obj(ect) into a json serializable object""" if isinstance(obj, numbers.Integral): return int(obj) - elif isinstance(obj, (numbers.Rational, numbers.Real)): + if isinstance(obj, (numbers.Rational, numbers.Real)): return float(obj) - elif isinstance(obj, (datetime.date, datetime.datetime, datetime.time)): + if isinstance(obj, (datetime.date, datetime.datetime, datetime.time)): return obj.isoformat().replace("T", " ") - elif isinstance(obj, str): + if isinstance(obj, str): return obj - elif isinstance(obj, dict): + if isinstance(obj, dict): return obj - elif hasattr(obj, "as_list"): + if hasattr(obj, "as_list"): return obj.as_list() - elif hasattr(obj, "as_dict"): + if hasattr(obj, "as_dict"): return obj.as_dict() - elif hasattr(obj, "__iter__") or isinstance(obj, types.GeneratorType): + if hasattr(obj, "__iter__") or isinstance(obj, types.GeneratorType): return list(obj) - elif hasattr(obj, "xml"): + if hasattr(obj, "xml"): return obj.xml() - elif isinstance( + if isinstance( obj, enum.Enum ): # Enum class handled specially to address self reference in __dict__ return dict(name=obj.name, value=obj.value, __class__=obj.__class__.__name__) - elif hasattr(obj, "__dict__") and hasattr(obj, "__class__"): + if hasattr(obj, "__dict__") and hasattr(obj, "__class__"): d = dict(obj.__dict__) d["__class__"] = obj.__class__.__name__ return d @@ -315,6 +324,7 @@ def objectify(obj): def dumps(obj, sort_keys=True, indent=2): + """General purpose memoize function with sane default""" return json.dumps(obj, default=objectify, sort_keys=sort_keys, indent=indent) @@ -323,89 +333,107 @@ def dumps(obj, sort_keys=True, indent=2): ######################################################################################### -class Fixture: - """all fixtures should inherit from this class and have the methods below""" +class LocalUndefined(RuntimeError): + """ + Exception raised trying to access an unitialized thread local + from a Fixture + """ + + +class BareFixture: + """Minimal Fixture class - without thread local logic""" # normally on_success/on_error are only called if none of the previous # on_request failed, if a fixture is_hook then on_error is always called. is_hook = False - def on_request(self, context): - pass # called when a request arrived + def on_request(self, context): # pylint: disable=unused-argument + """Method that will be called when a new HTTP request arrives""" - def on_error(self, context): - pass # called when a request errors + def on_error(self, context): # pylint: disable=unused-argument + """Method that will be called when an HTTP request errors""" + + def on_success(self, context): # pylint: disable=unused-argument + """Method that will be called when an HTTP request succeeds""" - def on_success(self, context): - pass # called when a request is successful -class MetaLocalUndefined(RuntimeError): pass +class Fixture(BareFixture): + """ + Fixture class - with thread local logic + all fixtures should inherit from this class + """ -class MetaLocal: - """special object for safe thread locals""" + ### being logic to handle safe thread local _local = threading.local() @staticmethod - def local_initialize(self): + def local_initialize(obj): """To be called in on_request if the Fixtures needs a thread local dict""" - if not hasattr(MetaLocal._local, "request_ctx"): - MetaLocal._local.request_ctx = {} - if self in MetaLocal._local.request_ctx: - raise RuntimeError(f"initialize_thread_local called twice for {self}") - MetaLocal._local.request_ctx[self] = types.SimpleNamespace() + if not hasattr(Fixture._local, "request_ctx"): + Fixture._local.request_ctx = {} + if obj in Fixture._local.request_ctx: + raise RuntimeError(f"initialize_thread_local called twice for {obj}") + Fixture._local.request_ctx[obj] = types.SimpleNamespace() @staticmethod - def local_delete(self): + def local_delete(obj): """To be called in on_success and on_error to cleat the thread local""" - del MetaLocal._local.request_ctx[self] + del Fixture._local.request_ctx[obj] @property def local(self): """Returns the fixture thread local dict if initialized else a default one""" try: - return MetaLocal._local.request_ctx[self] + return Fixture._local.request_ctx[self] except (AttributeError, KeyError): - raise MetaLocalUndefined(f"thread local not initialized for {self}") from None + raise LocalUndefined(f"thread local not initialized for {self}") from None def is_valid(self): """Checks if the fixture has a valid thread local dict""" try: - MetaLocal._local.request_ctx[self] + Fixture._local.request_ctx[self] # pylint: disable=pointless-statement return True except (AttributeError, KeyError): return False + ### end logic to handle safe thread local -class Translator(pluralize.Translator, Fixture): + +class Translator(BareFixture, pluralize.Translator): """a Fixture wrapper for the pluralize.Translator""" def on_request(self, context): + """Sets the request language from the request header""" # important: pluralize.Translator has its own thread local self.select(request.headers.get("Accept-Language", "en")) def on_success(self, context): - response.headers["Content-Language"] = self.local.tag + """Inject the selected language in the response header""" + response.headers.setdefault("Content-Language", self.local.tag) class DAL(pydal.DAL, Fixture): """a Fixture wrappre for pydal.DAL""" def on_request(self, context): + """Retrieves a database connection from the pool""" # important: the connection pool handles its own thread local self.get_connection_from_pool_or_new() threadsafevariable.ThreadSafeVariable.restore(ICECUBE) def on_error(self, context): + """Rollback and recycle connection""" self.recycle_connection_in_pool_or_close("rollback") def on_success(self, context): + """Commit and recycle connection""" self.recycle_connection_in_pool_or_close("commit") # make sure some variables in pydal are thread safe def thread_safe_pydal_patch(): - Field = pydal.DAL.Field + """Make the selected fields attributes thread local variables""" tsafe_attrs = [ "readable", "writable", @@ -453,7 +481,7 @@ def field_copy(self): ######################################################################################### -class Flash(Fixture, MetaLocal): +class Flash(Fixture): """ flash = Flash() @@ -468,7 +496,8 @@ def index(): """ def on_request(self, context): - MetaLocal.local_initialize(self) + """Retrieves flash message from cookie if present""" + Fixture.local_initialize(self) # when a new request arrives we look for a flash message in the cookie flash = request.get_cookie("py4web-flash") if flash: @@ -477,6 +506,7 @@ def on_request(self, context): self.local.flash = None def on_success(self, context): + """Stores the flash message in cookie""" # if we redirect and have a flash message we move it to the session status = context["status"] if status == 303 and self.local.flash: @@ -492,13 +522,10 @@ def on_success(self, context): else: context["template_inject"] = dict(flash=flash) elif self.local.flash is not None: - response.headers["component-flash"] = json.dumps(flash) - - def on_error(self, context): - """Clears the local to prevent leakage.""" - pass + response.headers.setdefault("component-flash", json.dumps(flash)) def set(self, message, _class="", sanitize=True): + """Stores a message in the object thread safe storage""" # we set a flash message if sanitize: message = yatl.sanitizer.xmlescape(message) @@ -511,19 +538,22 @@ def set(self, message, _class="", sanitize=True): class RenoirXMLEscapeMixin: + """for internal Renoir use""" + def _escape_data(self, data): """Allows Renoir to convert yatl helpers to strings""" return safely( - lambda: data.xml(), default=lambda: self._to_html(self._to_unicode(data)) + lambda: data.xml(), # pylint: disable=unnecessary-lambda + default=lambda: self._to_html(self._to_unicode(data)), ) class RenoirCustomWriter(RenoirXMLEscapeMixin, renoir.writers.Writer): - ... + """for internal Renoir use""" class RenoirCustomEscapeAllWriter(RenoirXMLEscapeMixin, renoir.writers.EscapeAllWriter): - ... + """for internal Renoir use""" class Renoir(renoir.Renoir): @@ -539,35 +569,47 @@ def render( content=None, filename=None, path=".", - context={}, + context=None, delimiters="[[ ]]", cached_renoir_engines=Cache(100), ): """ - renders the template using renoire, same API as yatl.render, does caching of + Renders the template using renoire, same API as yatl.render, does caching of both Renoire engine and source files """ + context = context or {} engine = cached_renoir_engines.get( (path, delimiters), lambda: Renoir(path=path, delimiters=delimiters.split(" "), reload=True), ) if content is not None: - return engine._render(content, context=context) + return engine._render( # pylint: disable=protected-access + content, context=context + ) return engine.render(filename, context=context) class Template(Fixture): + """The Template Fixture class""" + cache = Cache(100) def __init__(self, filename, path=None, delimiters="[[ ]]"): + """Initialized the template object""" self.filename = filename self.path = path self.delimiters = delimiters def on_success(self, context): + """ + Filters the context output through the template + Also injects helpers in the output dict + """ output = context["output"] + + # we only proceed furthed if the output is a dict if not isinstance(output, dict): - return output + return ctx = dict(request=request) ctx.update(HELPERS) @@ -592,7 +634,9 @@ def on_success(self, context): ######################################################################################### -class Session(Fixture, MetaLocal): +class Session(Fixture): + """The Session Fixture""" + # All apps share the same default secret if not specified. # important for _dashboard reload # the actual value is loaded from a file @@ -601,6 +645,7 @@ class Session(Fixture, MetaLocal): @property def params(self): + """Returns the object parameters""" return Session._params[self] def __init__( @@ -613,6 +658,8 @@ def __init__( name="{app_name}_session", ): """ + Creates a session object. + secret is the shared key used to encrypt the session (using algorithm) expiration is in seconds (optional) storage must have a get(key) and set(key,value,expiration) methods @@ -646,19 +693,21 @@ def __init__( @property def __prerequisites__(self): + """Returns the session prerequisite fixtures""" return self.params.prerequisites def load(self): + """Loads a session""" app_name = request.app_name params = self.params self_local = self.local self_local.changed = False self_local.data = {} - self_local.session_cookie_name = params.name.format(app_name=app_name) + self_local.cookie_name = params.name.format(app_name=app_name) self_local.secure = request.url.startswith("https") - raw_token = request.get_cookie( - self_local.session_cookie_name - ) or request.query.get("_session_token") + raw_token = request.get_cookie(self_local.cookie_name) or request.query.get( + "_session_token" + ) if not raw_token and request.method in {"POST", "PUT", "DELETE", "PATCH"}: raw_token = ( request.forms @@ -668,42 +717,51 @@ def load(self): ) if DEBUG: logging.debug("Session token found %s", raw_token) + data = {} + # if we have a token in the query string of cookie if raw_token: try: + # if session i stored serverside if params.storage: + # used token as id and retrieve data token_data = raw_token.encode() json_data = params.storage.get(token_data) if isinstance(json_data, bytes): json_data = json_data.decode("utf8") if json_data: - self_local.data = json.loads(json_data) + data = json.loads(json_data) else: + # rertieve the data from inside the token itself try: - self_local.data = secure_loads( - raw_token, params.secret.encode() - ) + data = secure_loads(raw_token, params.secret.encode()) except (AssertionError, json.JSONDecodeError): - self_local.data = {} + data = {} except Exception as err: + # something went wrong, unable to load session data if DEBUG: logging.debug("Session error %s", err) + # if the session data is valid update the current session if ( - self.local.data.get("session_cookie_name") != self_local.session_cookie_name - or self.local.data.get("secure") != self_local.secure - or "uuid" not in self.local.data - or ( - params.expiration is not None - and params.storage is None - and self_local.data["timestamp"] < time.time() - int(params.expiration) + data.get("cookie_name") == self_local.cookie_name # have valid cookie + and data.get("secure") == self_local.secure # have valid security + and data.get("uuid") is not None # have a uuid + and ( + params.expiration is None # has not expired + or data["timestamp"] > time.time() - int(params.expiration) ) ): - self.clear() + self_local.data.update(data) # the take the loaded data def save(self): + """Saves the session""" params = self.params self_local = self.local + # make sure the session constain these basic veriables + if "uuid" not in self_local.data: + self_local.data["uuid"] = str(uuid.uuid4()) self_local.data["timestamp"] = time.time() - self_local.data["session_cookie_name"] = self_local.session_cookie_name + self_local.data["secure"] = self_local.secure + self_local.data["cookie_name"] = self_local.cookie_name if params.storage: cookie_data = self_local.data["uuid"] params.storage.set( @@ -714,7 +772,7 @@ def save(self): if DEBUG: logging.debug("Session stored %s", cookie_data) response.set_cookie( - self_local.session_cookie_name, + self_local.cookie_name, cookie_data, path="/", secure=self_local.secure, @@ -723,33 +781,41 @@ def save(self): ) def get(self, key, default=None): + """Get the value for the key from session""" try: return self.local.data.get(key, default) - except MetaLocalUndefined: + except LocalUndefined: return default def __getitem__(self, key): + """Session key getter""" return self.local.data[key] def __delitem__(self, key): + """Deletes a key from the session""" if key in self.local.data: self.local.changed = True del self.local.data[key] def __setitem__(self, key, value): + """Session key setter""" self.local.changed = True self.local.data[key] = value def __contains__(self, other): + """Checks if a key in session""" return other in self.local.data def keys(self): + """Returns the session keys""" return self.local.data.keys() def items(self): + """Returns the (key, value) in session""" return self.local.data.items() def __iter__(self): + """Iterates over the session keys""" yield from self.local.data.keys() __getattr__ = get @@ -757,22 +823,18 @@ def __iter__(self): __delattr__ = __delitem__ def clear(self): - """Produces a brand-new session.""" + """Clears the session key""" self_local = self.local self_local.changed = True self_local.data.clear() - self_local.data["uuid"] = str(uuid.uuid4()) - self_local.data["secure"] = self_local.secure def on_request(self, context): - MetaLocal.local_initialize(self) + """Initializes the session thread local and tries to load a session""" + Fixture.local_initialize(self) self.load() - def on_error(self, context): - if self.local.changed: - self.save() - def on_success(self, context): + """Saves the session if its content changed""" if self.local.changed: self.save() @@ -782,16 +844,18 @@ def on_success(self, context): ######################################################################################### -def URL( +def URL( # pylint: disable=invalid-name *parts, - vars=None, - hash=None, + vars=None, # pylint: disable=redefined-builtin + hash=None, # pylint: disable=redefined-builtin scheme=False, signer=None, use_appname=None, static_version=None, ): """ + Generates a URL for the action. + Examples: URL('a','b',vars=dict(x=1),hash='y') -> /{script_name?}/{app_name?}/a/b?x=1#y URL('a','b',vars=dict(x=1),scheme=None) -> //{domain}/{script_name?}/{app_name?}/a/b?x=1 @@ -814,16 +878,16 @@ def URL( if parts and parts[0].startswith("/"): prefix = "" elif has_appname and app_name != "_default": - prefix = "%s/%s/" % (script_name, app_name) + prefix = f"{script_name}/{app_name}/" else: - prefix = "%s/" % script_name + prefix = f"{script_name}/" broken_parts = [] for part in parts: broken_parts += str(part).rstrip("/").split("/") if static_version != "" and broken_parts and broken_parts[0] == "static": if not static_version: # try to retrieve from __init__.py - app_module = "apps.%s" % app_name if has_appname else "apps" + app_module = f"apps.{app_name}" if has_appname else "apps" try: static_version = getattr( sys.modules[app_module], "__static_version__", None @@ -843,10 +907,10 @@ def URL( signer.sign(prefix + "/".join(broken_parts), urlvars) if urlvars: url += "?" + "&".join( - "%s=%s" % (k, urllib.parse.quote(str(v))) for k, v in urlvars.items() + f"{k}={urllib.parse.quote(str(v))}" for k, v in urlvars.items() ) if hash: - url += "#%s" % hash + url += f"#{hash}" if scheme is not False: original_url = request.environ.get("HTTP_ORIGIN") or request.url orig_scheme, _, domain = original_url.split("/", 3)[:3] @@ -856,7 +920,7 @@ def URL( scheme = "" else: scheme += ":" - url = "%s//%s%s" % (scheme, domain, url) + url = f"{scheme}//{domain}{url}" return url @@ -868,31 +932,33 @@ def URL( class HTTP(BaseException): """An exception that is considered success""" - def __init__(self, status, body="", headers={}): + def __init__(self, status, body="", headers=None): + """Makes an HTTP object""" self.status = status self.body = body - self.headers = headers + self.headers = headers or {} def redirect(location): - """raises HTTP(303) to the specified location""" - response.headers["Location"] = location + """Raises HTTP(303) to redirect to the specified location""" + response.headers.setdefault("Location", location) raise HTTP(303) -class action: +class action: # pylint: disable=invalid-name """@action(...) is a decorator for functions to be exposed as actions""" registered = set() app_name = "_default" def __init__(self, path, **kwargs): + """Constructs the action decorator""" self.path = path self.kwargs = kwargs @staticmethod def uses(*fixtures_in): - """Find all fixtures, including dependencies, topologically sorted""" + """Used to declare needed fixtures, they will be topologically sorted""" fixtures = [] reversed_fixtures = [] stack = list(fixtures_in) @@ -909,15 +975,15 @@ def uses(*fixtures_in): def decorator(func): if DEBUG: # in debug mode log all calls to fixtures - def call(f, context): + def call_f(f, context): logging.debug( - f"Calling {f.__self__.__class__.__name__}.{f.__name__}" + "Calling %s.%s", f.__self__.__class__.__name__, f.__name__ ) return f(context) else: - def call(f, context): + def call_f(f, context): return f(context) @functools.wraps(func) @@ -933,12 +999,12 @@ def wrapper(*args, **kwargs): } try: for fixture in fixtures: - call(fixture.on_request, context) + call_f(fixture.on_request, context) processed.append(fixture) context["output"] = func(*args, **kwargs) - except HTTP as http: - context["status"] = http.status - raise http + except HTTP as http_exception: + context["status"] = http_exception.status + raise http_exception except bottle.HTTPError as error: context["exception"] = error except bottle.HTTPResponse: @@ -950,12 +1016,12 @@ def wrapper(*args, **kwargs): if fixture in processed or getattr(fixture, "is_hook", False): try: if context.get("exception"): - call(fixture.on_error, context) + call_f(fixture.on_error, context) else: - call(fixture.on_success, context) - except Exception as error: - context["exception"] = context.get("exception") or error - safely(lambda: MetaLocal.local_delete(fixture)) + call_f(fixture.on_success, context) + except Exception as err: + context["exception"] = context.get("exception") or err + safely(lambda: Fixture.local_delete(fixture)) exception = context.get("exception") if isinstance(exception, Exception): raise exception @@ -975,7 +1041,7 @@ def wrapper(*func_args, **func_kwargs): request.app_name = app_name ret = func(*func_args, **func_kwargs) if isinstance(ret, (list, dict)): - response.headers["Content-Type"] = "application/json" + response.headers.setdefault("Content-Type", "application/json") ret = dumps(ret) elif ret is None: ret = "" @@ -984,14 +1050,14 @@ def wrapper(*func_args, **func_kwargs): elif not hasattr(ret, "__iter__"): raise RuntimeError(f"Cannot return type {ret.__class__.__name__}") return ret - except HTTP as http: - response.status = http.status - response.headers.update(http.headers) - body = http.body + except HTTP as http_exception: + response.status = http_exception.status + response.headers.update(http_exception.headers) + body = http_exception.body return dumps(body) if isinstance(body, (list, dict)) else str(body) except bottle.HTTPResponse: raise - except Exception: + except Exception: # pylint: disable=broad-exception-caught snapshot = get_error_snapshot() logging.error(snapshot["traceback"]) ticket_uuid = error_logger.log(request.app_name, snapshot) or "unknown" @@ -1020,12 +1086,16 @@ def __call__(self, func): class Condition(Fixture): + """The Condition Fixture""" + def __init__(self, condition, on_false=None, exception=HTTP(400)): + """Creates a fixture that checks for a given condition""" self.condition = condition self.on_false = on_false self.exception = exception def on_request(self, context): + """Checks if the condition is true or false""" if not self.condition(): if self.on_false is not None: self.on_false() @@ -1036,7 +1106,9 @@ def on_request(self, context): # Monkey Patch: Cookies ######################################################################################### -http.cookies.Morsel._reserved["same-site"] = "SameSite" +http.cookies.Morsel._reserved[ # pylint: disable=protected-access + "same-site" +] = "SameSite" ######################################################################################### # Monkey Patch: ssl bug for gevent @@ -1056,6 +1128,7 @@ def new_sslwrap( ca_certs=None, ciphers=None, ): + """Used to support HTTP""" context = __ssl__.SSLContext(ssl_version) context.verify_mode = cert_reqs or __ssl__.CERT_NONE if ca_certs: @@ -1065,7 +1138,9 @@ def new_sslwrap( if ciphers: context.set_ciphers(ciphers) caller_self = inspect.currentframe().f_back.f_locals["self"] - return context._wrap_socket(sock, server_side=server_side, ssl_sock=caller_self) + return context._wrap_socket( # pylint: disable=protected-access + sock, server_side=server_side, ssl_sock=caller_self + ) ######################################################################################### @@ -1085,7 +1160,7 @@ def get_error_snapshot(depth=5): elif errorlog == ":stdout": sys.stdout.write(msg) elif errorlog == "tickets_only": - pass + ... else: with portalocker.Lock(errorlog, "a", timeout=2) as fp: fp.write(msg) @@ -1126,9 +1201,9 @@ def get_error_snapshot(depth=5): del etb # Prevent circular references that would cause memory leaks data["stackframes"] = stackframes = [] - for frame, file, lnum, func, lines, idx in items: + for frame, file, lnum, func, lines, idx in items: # pylint: disable=unused-variable file = file and os.path.abspath(file) or "?" - args, varargs, varkw, locals = inspect.getargvalues(frame) + # TODO: call inspect.getargvalues(frame) and get more info # Basic frame information f = {"file": file, "func": func, "lnum": lnum} f["code"] = lines @@ -1138,15 +1213,18 @@ def get_error_snapshot(depth=5): class SimpleErrorLogger: + """Simple Error Logger""" + def log(self, app_name, snapshot): - """logs the error""" - logging.error("%s error:\n%s" % (app_name, snapshot["traceback"])) - return None + """Logs the error""" + logging.error("%s error:\n%s", app_name, snapshot["traceback"]) class DatabaseErrorLogger: + """Database Error Logger""" + def __init__(self): - """creates the py4web_error table in the service database""" + """Creates the py4web_error table in the service database""" uri = os.environ["PY4WEB_SERVICE_DB_URI"] folder = os.environ["PY4WEB_SERVICE_FOLDER"] self.db = DAL(uri, folder=folder) @@ -1164,7 +1242,7 @@ def __init__(self): self.db.commit() def log(self, app_name, error_snapshot): - """store error snapshot (ticket) in the database""" + """Store error snapshot (ticket) in the database""" ticket_uuid = str(uuid.uuid4()) try: self.db.py4web_error.insert( @@ -1179,13 +1257,13 @@ def log(self, app_name, error_snapshot): ) self.db.commit() return ticket_uuid - except Exception as err: + except Exception as err: # pylint: disable=broad-exception-caught logging.error(str(err)) self.db.rollback() return None def get(self, ticket_uuid=None): - """retrieve a ticket from error database""" + """Retrieve a ticket from error database""" db = self.db if ticket_uuid: query, orderby = db.py4web_error.uuid == ticket_uuid, None @@ -1210,7 +1288,7 @@ def get(self, ticket_uuid=None): return rows if not ticket_uuid else rows[0] if rows else None def clear(self): - """erase all tickets from database""" + """Erase all tickets from database""" db = self.db db(db.py4web_error).delete() self.db.commit() @@ -1234,17 +1312,17 @@ def __init__(self): self.plugins = {} def initialize(self): - """try inizalize database if we have service folder""" + """Try inizalize database if we have service folder""" self.database_logger = safely(DatabaseErrorLogger, log=True) def _get_logger(self, app_name): - """get the appropriate logger for the app""" + """Get the appropriate logger for the app""" return ( self.plugins.get(app_name) or self.database_logger or self.fallback_logger ) def log(self, app_name, error_snapshot): - """log the error snapshot""" + """Log the error snapshot""" logger = self._get_logger(app_name) ticket_uuid = safely(lambda: logger.log(app_name, error_snapshot)) if not ticket_uuid: @@ -1259,23 +1337,21 @@ def log(self, app_name, error_snapshot): ######################################################################################### -class StreamProxy: - def __init__(self, stream): - self._stream = stream - - def write(self, *args, **kwargs): - return self._stream.write(*args, **kwargs) - - class Reloader: + """ + Class responsible for loading/readloading apps + """ + ROUTES = collections.defaultdict(list) MODULES = {} ERRORS = {} @staticmethod def install_reloader_hook(): + """Installs the Reloader hook, checks for changes at every request""" + # used by watcher - def hook(*a, **k): + def hook(*args, **kwargs): # pylint: disable=unused-argument app_name = request.path.split("/")[1] if app_name not in Reloader.ROUTES: app_name = "_default" @@ -1289,6 +1365,7 @@ def hook(*a, **k): @staticmethod def clear_routes(app_names=None): + """Clears all stored routes""" remove_route = bottle.default_app().router.remove if app_names is None: app_names = Reloader.ROUTES.keys() @@ -1305,7 +1382,7 @@ def import_apps(): # if first time reload dummy top module if not Reloader.MODULES: path = os.path.join(folder, "__init__.py") - module = load_module("apps", path) # noqa: F841 + load_module("apps", path) # noqa: F841 # Then load all the apps as submodules if os.environ.get("PY4WEB_APP_NAMES"): app_names = os.environ.get("PY4WEB_APP_NAMES").split(",") @@ -1316,6 +1393,7 @@ def import_apps(): @staticmethod def import_app(app_name, clear_before_import=True): + """Imports a specified app and its routes""" if clear_before_import: Reloader.clear_routes([app_name]) Reloader.ROUTES[app_name] = [] @@ -1325,7 +1403,7 @@ def import_app(app_name, clear_before_import=True): if os.path.isdir(path) and not path.endswith("__") and os.path.exists(init): action.app_name = app_name - module_name = "apps.%s" % app_name + module_name = f"apps.{app_name}" def clear_modules(): # all files/submodules @@ -1340,40 +1418,39 @@ def clear_modules(): try: module = Reloader.MODULES.get(app_name) if not module: - click.echo("[ ] loading %s ..." % app_name) + click.echo(f"[ ] loading {app_name} ...") else: - click.echo("[ ] reloading %s ..." % app_name) + click.echo(f"[ ] reloading {app_name} ...") # forget the module del Reloader.MODULES[app_name] clear_modules() load_module_message = None - buf_out = StreamProxy(io.StringIO()) - buf_err = StreamProxy(buf_out._stream) + buf_out = io.StringIO() + buf_err = buf_out with redirect_stdout(buf_out), redirect_stderr(buf_err): module = load_module(module_name, init) - load_module_message = buf_out._stream.getvalue() - buf_out._stream.close() - buf_out._stream = sys.stdout - buf_err._stream = sys.stderr + load_module_message = buf_out.getvalue() + buf_out.close() + buf_out = sys.stdout + buf_err = sys.stderr if load_module_message: - click.secho("\x1b[A output %s " % app_name, fg="yellow") + click.secho(f"\x1b[A output {app_name} ", fg="yellow") click.echo(load_module_message) - click.secho("\x1b[A[X] loaded %s " % app_name, fg="green") + click.secho(f"\x1b[A[X] loaded {app_name} ", fg="green") Reloader.MODULES[app_name] = module Reloader.ERRORS[app_name] = None - except Exception as err: + except Exception as err: # pylint: disable=broad-exception-caught Reloader.ERRORS[app_name] = traceback.format_exc() error_logger.log(app_name, get_error_snapshot()) click.secho( - "\x1b[A[FAILED] loading %s (%s)" % (app_name, err), + f"\x1b[A[FAILED] loading {app_name} ({err})", fg="red", ) # clear all files/submodules if the loading fails clear_modules() - return None # Expose static files with support for static asset management static_folder = os.path.join(path, "static") @@ -1384,6 +1461,7 @@ def clear_modules(): path = prefix + r"/static/" def server_static(fp, static_folder=static_folder): + """Action that serves static/ files""" filename = fp response.headers.setdefault("Pragma", "cache") response.headers.setdefault("Cache-Control", "private") @@ -1391,10 +1469,13 @@ def server_static(fp, static_folder=static_folder): Reloader.register_route(app_name, path, {"method": "GET"}, server_static) + # Very important to make sure actions can modify Field attributes + # in a thread safe manner ICECUBE.update(threadsafevariable.ThreadSafeVariable.freeze()) @staticmethod def register_route(app_name, rule, kwargs, func): + """Given an app_name and a rule registers the corresponding routes""" url_prefix = os.environ.get("PY4WEB_URL_PREFIX", "") if url_prefix and rule == "/": rule = "" @@ -1432,18 +1513,19 @@ def register_route(app_name, rule, kwargs, func): } -def error_page(code, button_text=None, href="#", color=None, message=None): +def error_page(token, button_text=None, href="#", color=None, message=None): + """Generates an error page""" if button_text: button_text = sanitize_html.escape(button_text) href = sanitize_html.escape(href) - message = http.client.responses[code].upper() if message is None else message + message = http.client.responses[token].upper() if message is None else message color = ( {"4": "#F44336", "5": "#607D8B"}.get(str(code)[0], "#2196F3") if not color else color ) context = dict( - code=code, message=message, button_text=button_text, href=href, color=color + code=token, message=message, button_text=button_text, href=href, color=color ) # if client accepts 'application/json' - return json if re.search(REGEX_APPJSON, request.headers.get("accept", "")): @@ -1455,7 +1537,8 @@ def error_page(code, button_text=None, href="#", color=None, message=None): @bottle.error(404) -def error404(error): +def error404(error): # pylint: disable=unused-argument + """Generates a 404 page""" guess_app_name = ( "index" if request.environ.get("HTTP_X_PY4WEB_APPNAME") @@ -1493,13 +1576,14 @@ def sass_compile(changed_files): def app_watch_handler(watched_app_subpaths): + """Finds files to watch for changes""" stack = inspect.stack invoker = pathlib.Path(stack()[1].filename) apps_path = pathlib.Path(os.environ["PY4WEB_APPS_FOLDER"]) app = invoker.relative_to(os.environ["PY4WEB_APPS_FOLDER"]).parts[0] def decorator(func): - handler = "{}.{}".format(func.__module__, func.__name__) + handler = f"{func.__module__}.{func.__name__}" APP_WATCH["handlers"][handler] = func for subpath in watched_app_subpaths: app_path = apps_path.joinpath(app, subpath).as_posix() @@ -1512,6 +1596,7 @@ def decorator(func): def try_app_watch_tasks(): + """If there are watch tasks, executes them when files change""" if APP_WATCH["tasks"]: tried_tasks = [] for handler in APP_WATCH["tasks"]: @@ -1519,7 +1604,7 @@ def try_app_watch_tasks(): try: APP_WATCH["handlers"][handler](changed_files_dict.keys()) tried_tasks.append(handler) - except Exception: + except Exception: # pylint: disable=broad-exception-caught logging.error(traceback.format_exc()) ## remove executed tasks from register for handler in tried_tasks: @@ -1527,15 +1612,17 @@ def try_app_watch_tasks(): def watch(apps_folder, server_config, mode="sync"): + """Watches files for change""" + def watch_folder_event_loop(apps_folder): + """Main event loop looking for file changes""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(watch_folder(apps_folder)) async def watch_folder(apps_folder): - click.echo( - "watching (%s-mode) python file changes in: %s" % (mode, apps_folder) - ) + """Async function that watches a folder for changes""" + click.echo(f"watching ({mode}-mode) python file changes in: {apps_folder}") async for changes in awatch(os.path.join(apps_folder)): apps = [] for subpath in [pathlib.Path(pair[1]) for pair in changes]: @@ -1562,7 +1649,8 @@ async def watch_folder(apps_folder): if server_config["number_workers"] > 1: click.echo("--watch option has no effect in multi-process environment \n") return - elif server_config["server"].startswith(("wsgiref", "waitress", "rocket")): + + if server_config["server"].startswith(("wsgiref", "waitress", "rocket")): # these servers block the main thread so we open a new thread for the file watcher threading.Thread( target=watch_folder_event_loop, args=(apps_folder,), daemon=True @@ -1584,10 +1672,11 @@ async def watch_folder(apps_folder): def log_routes(apps_routes, out_file="routes-py4web.txt"): + """Logs defined routes to a file""" tmp = os.environ.get("TEMPDIR", "/tmp") path_out_file = os.path.join(tmp, out_file) try: - with open(path_out_file, "w") as f: + with open(path_out_file, "w", encoding="utf8") as f: f.write( "\n".join( [ @@ -1602,6 +1691,7 @@ def log_routes(apps_routes, out_file="routes-py4web.txt"): def start_server(kwargs): + """Starts the web server""" host = kwargs["host"] port = int(kwargs["port"]) apps_folder = kwargs["apps_folder"] @@ -1631,11 +1721,9 @@ def start_server(kwargs): signal.signal( signal.SIGINT, lambda signal, frame: click.echo( - "KeyboardInterrupt (ID: {}) has been caught. Cleaning up...".format( - signal - ) - and sys.exit(0), - ), + "KeyboardInterrupt (ID: {signal}) has been caught. Cleaning up..." + ) + and sys.exit(0), ) params["server"] = server_config["server"] @@ -1653,7 +1741,7 @@ def start_server(kwargs): if kwargs["watch"] != "off": print("Error: watch doesn't work with gevent. ") print("invoke py4web with `--watch off` or choose another server. ") - exit(255) + sys.exit(255) if not hasattr(_ssl, "sslwrap"): _ssl.sslwrap = new_sslwrap @@ -1669,12 +1757,12 @@ def start_server(kwargs): bottle.run(**params) -def check_compatible(version): +def check_compatible(py4web_version): """To be called by apps to check if module version is compatible with py4web requirements""" - from . import __version__ + from . import __version__ # pylint: disable=import-outside-toplevel return tuple(map(int, __version__.split("."))) >= tuple( - map(int, version.split(".")) + map(int, py4web_version.split(".")) ) @@ -1708,7 +1796,8 @@ def __init__(self, pkg, pkg_alias="apps"): # register as path finder sys.meta_path.append(self) - def find_spec(self, fullname, path=None, target=None): + def find_spec(self, fullname, path=None): + """Loads the spec for the module at fullname""" if fullname == self.pkg_alias and path is None: spec = importlib.util.find_spec(self.pkg) if spec: @@ -1717,9 +1806,11 @@ def find_spec(self, fullname, path=None, target=None): fullname, spec.origin ) return spec + return None -def install_args(kwargs, reinstall_apps=False): +def install_args(kwargs, reinstall_apps=False): # pylint: disable=too-many-statements + """Handles the command line argumens and adds them to the os.environ""" # always convert apps_folder to an absolute path apps_folder = kwargs["apps_folder"] = os.path.abspath(kwargs["apps_folder"]) kwargs["service_folder"] = os.path.join( @@ -1728,14 +1819,17 @@ def install_args(kwargs, reinstall_apps=False): kwargs["service_db_uri"] = DEFAULTS["PY4WEB_SERVICE_DB_URI"] for key, val in kwargs.items(): os.environ["PY4WEB_" + key.upper()] = str(val) + + global DEBUG # pylint: disable=global-statement DEBUG = kwargs.get("debug", False) + logging.getLogger().setLevel( 0 if DEBUG else kwargs.get("logging_level", logging.WARNING) ) yes2 = yes = kwargs.get("yes", False) # If the apps folder does not exist create it and populate it if not os.path.exists(apps_folder): - if yes or click.confirm("Create missing folder %s?" % apps_folder): + if yes or click.confirm(f"Create missing folder {apps_folder}?"): os.makedirs(apps_folder) yes2 = True else: @@ -1743,9 +1837,9 @@ def install_args(kwargs, reinstall_apps=False): sys.exit(0) init_py = os.path.join(apps_folder, "__init__.py") if not os.path.exists(init_py): - if yes2 or click.confirm("Create missing init file %s?" % init_py): + if yes2 or click.confirm(f"Create missing init file {init_py}?"): with open(init_py, "wb"): - pass + ... else: click.echo("Command aborted") sys.exit(0) @@ -1760,10 +1854,10 @@ def install_args(kwargs, reinstall_apps=False): os.mkdir(kwargs["service_folder"]) session_secret_filename = os.path.join(kwargs["service_folder"], "session.secret") if not os.path.exists(session_secret_filename): - with open(session_secret_filename, "w") as fp: + with open(session_secret_filename, "w", encoding="utf8") as fp: fp.write(str(uuid.uuid4())) - with open(session_secret_filename) as fp: + with open(session_secret_filename, "r", encoding="utf8") as fp: Session.SECRET = fp.read() # after everything is etup but before installing apps, init @@ -1781,8 +1875,8 @@ def install_args(kwargs, reinstall_apps=False): app_name = filename.split(".")[-2] target_dir = os.path.join(apps_folder, app_name) if not os.path.exists(target_dir): - if yes or click.confirm("Create app %s?" % app_name): - click.echo("[ ] Unzipping app %s" % filename) + if yes or click.confirm(f"Create app {app_name}?"): + click.echo(f"[ ] Unzipping app {filename}") with zipfile.ZipFile(zip_filename, "r") as zip_file: os.makedirs(target_dir) zip_file.extractall(target_dir) @@ -1803,28 +1897,28 @@ def wsgi(**kwargs): @click.group( context_settings=dict(help_option_names=["-h", "-help", "--help"]), - help='%s\n\nType "%s COMMAND -h" for available options on commands' - % (__doc__, PY4WEB_CMD), + help=f'{__doc__}\n\nType "{PY4WEB_CMD} COMMAND -h" for available options on commands', ) def cli(): - pass + """The Command Line Interface""" @cli.command() @click.option( "-a", "--all", is_flag=True, default=False, help="List version of all modules" ) -def version(all): +def version(verbose=False): """Show versions and exit""" - from . import __version__ + from . import __version__ # pylint: disable=import-outside-toplevel - click.echo("py4web: %s" % __version__) - if all: - click.echo("system: %s" % platform.platform()) - click.echo("python: %s" % sys.version.replace("\n", " ")) + click.echo(f"py4web: {__version__}") + if verbose: + sys_version = sys.version.replace("\n", " ") + click.echo(f"system: {platform.platform()}") + click.echo(f"python: {sys_version}") for name in sorted(sys.modules): if hasattr(sys.modules[name], "__version__"): - click.echo("%s: %s" % (name, sys.modules[name].__version__)) + click.echo(f"{name}: {sys.modules[name].__version__}") @cli.command() @@ -1856,7 +1950,7 @@ def shell(**kwargs): """Open a python shell with apps_folder's parent added to the path""" if getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS"): # running in the PyInstaller binary bundle - import site # noqa: F401 + import site # pylint: disable=possibly-unused-variable,import-outside-toplevel install_args(kwargs) code.interact(local=dict(globals(), **locals())) @@ -1884,9 +1978,9 @@ def call(apps_folder, func, yes, args): install_args(dict(apps_folder=apps_folder, yes=yes)) apps_folder_name = os.path.basename(os.environ["PY4WEB_APPS_FOLDER"]) app_name = func.split(".")[0] - module, name = ("%s.%s" % (apps_folder_name, func)).rsplit(".", 1) + module, name = f"{apps_folder_name}.{func}".rsplit(".", 1) env = {} - exec("from %s import %s" % (module, name), {}, env) + exec(f"from {module} import {name}", {}, env) # pylint: disable=exec-used request.app_name = app_name env[name](**kwargs) @@ -1908,8 +2002,8 @@ def call(apps_folder, func, yes, args): ) def set_password(password, password_file): """Set administrator's password for the Dashboard""" - click.echo('Storing the hashed password in file "%s"\n' % password_file) - with open(password_file, "w") as fp: + click.echo(f'Storing the hashed password in file "{password_file}"\n') + with open(password_file, "w", encoding="utf8") as fp: fp.write(str(pydal.validators.CRYPT()(password)[0])) @@ -1939,15 +2033,14 @@ def new_app(apps_folder, app_name, yes, scaffold_zip): ) target_dir = os.path.join(os.environ["PY4WEB_APPS_FOLDER"], app_name) if not os.path.exists(source): - click.echo("Source app %s does not exists" % source) + click.echo(f"Source app {source} does not exists") sys.exit(1) elif os.path.exists(target_dir): - click.echo("Target folder %s already exists" % target_dir) + click.echo(f"Target folder {target_dir} already exists") sys.exit(1) else: - zfile = zipfile.ZipFile(source, "r") - zfile.extractall(target_dir) - zfile.close() + with zipfile.ZipFile(source, "r") as zfile: + zfile.extractall(target_dir) @cli.command() @@ -2069,10 +2162,8 @@ def run(**kwargs): """Run the applications on apps_folder""" install_args(kwargs) - from py4web import __version__ - click.secho(ART, fg="blue") - click.echo("Py4web: %s on Python %s\n\n" % (__version__, sys.version)) + click.echo(f"Py4web: {version} on Python {sys.version}\n\n") # Start Reloader.import_apps() @@ -2083,8 +2174,7 @@ def run(**kwargs): kwargs["password_file"] ): click.echo( - 'You have not set a dashboard password. Run "%s set_password" to do so.' - % PY4WEB_CMD + f'You have not set a dashboard password. Run "{PY4WEB_CMD} set_password" to do so.' ) elif "_dashboard" in Reloader.ROUTES and ( not kwargs["host"].startswith("unix:/") diff --git a/tests/test_action.py b/tests/test_action.py index 6f819b73..1e229eb2 100644 --- a/tests/test_action.py +++ b/tests/test_action.py @@ -1,6 +1,8 @@ +# pylint: disable=assignment-from-none import copy import multiprocessing import os +import threading import time import unittest import uuid @@ -9,7 +11,7 @@ import requests from py4web import DAL, HTTP, Cache, Condition, Field, Session, abort, action -from py4web.core import Fixture, MetaLocal, bottle, error404, request +from py4web.core import Fixture, bottle, error404, request os.environ["PY4WEB_APPS_FOLDER"] = os.path.sep.join( os.path.normpath(__file__).split(os.path.sep)[:-2] @@ -29,14 +31,12 @@ @action.uses(db, session) @action.uses(Condition(lambda: True)) def index(): - db.thing.insert(name="test") + new_id = db.thing.insert(name="test") session["number"] = session.get("number", 0) + 1 # test copying Field ThreadSafe attr db.thing.name.default = "test_clone" - field_clone = copy.copy(db.thing.name) - clone_ok = 1 if field_clone.default == db.thing.name.default == "test_clone" else 0 - return "ok %s %s %s" % (session["number"], db(db.thing).count(), clone_ok) + return "ok %s %s %s" % (session["number"], db(db.thing).count(), new_id) def fail(): @@ -84,6 +84,20 @@ def run_server(): bottle.run(host="localhost", port=8001) +class FieldTest(unittest.TestCase): + """Check that we chat we can safely clone Field(s)""" + + def test_fiel_clone(self): + def test(): + db.thing.name.default = "test" + field_clone = copy.copy(db.thing.name) + assert field_clone.default == db.thing.name.default == "test" + + thread = threading.Thread(target=test) + thread.start() + thread.join() + + class CacheAction(unittest.TestCase): def setUp(self): self.server = multiprocessing.Process(target=run_server) @@ -104,23 +118,23 @@ def test_action(self): time.sleep(2) res = self.browser.open("http://127.0.0.1:8001/tests/index") - self.assertEqual(res.read(), b"ok 2 2 1") + self.assertEqual(res.read(), b"ok 2 2 2") def test_error(self): - res = requests.get("http://127.0.0.1:8001/tests/conditional") + res = requests.get("http://127.0.0.1:8001/tests/conditional", timeout=5) self.assertEqual(res.status_code, 404) - res = requests.get("http://127.0.0.1:8001/tests/raise300") + res = requests.get("http://127.0.0.1:8001/tests/raise300", timeout=5) self.assertEqual(res.status_code, 300) - res = requests.get("http://127.0.0.1:8001/tests/abort") + res = requests.get("http://127.0.0.1:8001/tests/abort", timeout=5) self.assertEqual(res.status_code, 400) - res = requests.get("http://127.0.0.1:8001/tests/abort_caught") + res = requests.get("http://127.0.0.1:8001/tests/abort_caught", timeout=5) self.assertEqual(res.status_code, 200) self.assertEqual(res.content, b"caught") - res = requests.get("http://127.0.0.1:8001/tests/bottle_httpresponse") + res = requests.get("http://127.0.0.1:8001/tests/bottle_httpresponse", timeout=5) self.assertEqual(res.status_code, 200) self.assertEqual(res.content, b"ok") diff --git a/tests/test_auth.py b/tests/test_auth.py index 4fafabdb..5327e93d 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -3,7 +3,7 @@ import unittest import uuid -from py4web.core import DAL, HTTP, Field, MetaLocal, Session, bottle, request, safely +from py4web.core import DAL, HTTP, Field, Fixture, Session, bottle, request, safely from py4web.utils.auth import Auth, AuthAPI SECRET = str(uuid.uuid4()) @@ -23,7 +23,7 @@ def setUp(self): def tearDown(self): # this is normally done by @action - safely(lambda: MetaLocal.local_delete(self.session)) + safely(lambda: Fixture.local_delete(self.session)) bottle.app.router.remove("/*") def action(self, name, method, query, data): @@ -34,18 +34,18 @@ def action(self, name, method, query, data): # we break a symmetry below. should fix in auth.py if name.startswith("api/"): return getattr(AuthAPI, name[4:])(self.auth) - else: - return getattr(self.auth.form_source, name)() + return getattr(self.auth.form_source, name)() - def on_request(self, context={}, keep_session=False): + def on_request(self, context=None, keep_session=False): # store the current session + context = context or {} try: storage = self.session.local.__dict__ except RuntimeError: storage = None # reinitialize everything - safely(lambda: MetaLocal.local_delete(self.session)) - safely(lambda: MetaLocal.local_delete(self.auth.flash)) + safely(lambda: Fixture.local_delete(self.session)) + safely(lambda: Fixture.local_delete(self.auth.flash)) self.session.on_request(context) self.auth.flash.on_request(context) self.auth.on_request(context) diff --git a/tests/test_cache.py b/tests/test_cache.py index f9fd065a..e5d91083 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -18,7 +18,7 @@ def test_logic(self): def test_different_keys(self): cache = py4web.Cache() results = set() - for k in range(100): + for _ in range(100): results.add(cache.get("a", random.random)) results.add(cache.get("b", random.random)) results.add(cache.get("c", random.random)) @@ -29,7 +29,9 @@ def test_change_detection(self): results = set() for k in range(30): results.add( - cache.get("a", random.random, expiration=0, monitor=lambda: int(k / 10)) + cache.get( + "a", random.random, expiration=0, monitor=lambda k=k: int(k / 10) + ) ) self.assertEqual(len(results), 3) time.sleep(0.02) @@ -42,7 +44,7 @@ def test_timing(self): for k in range(M): cache.get(k, random.random) t0 = time.time() - for k in range(N): + for _ in range(N): cache.get("new", random.random) self.assertTrue((time.time() - t0) / N, 1 - 5) self.assertTrue(cache.free == 0) @@ -55,11 +57,11 @@ def f(x): return x + random.random() results = set() - for k in range(10): + for _ in range(10): results.add(f(1)) results.add(f(2)) time.sleep(0.2) - for k in range(10): + for _ in range(10): results.add(f(1)) results.add(f(2)) self.assertEqual(len(results), 4) diff --git a/tests/test_fixture.py b/tests/test_fixture.py index 0e339c32..e1b4cbc2 100644 --- a/tests/test_fixture.py +++ b/tests/test_fixture.py @@ -1,9 +1,7 @@ import threading -from types import SimpleNamespace - import pytest -from py4web.core import Fixture, MetaLocal +from py4web.core import Fixture def run_thread(func, *a): @@ -11,9 +9,9 @@ def run_thread(func, *a): return t -class Foo(Fixture, MetaLocal): +class Foo(Fixture): def on_request(self, context): - MetaFixture.local_initialize(self) + Fixture.local_initialize(self) @property def bar(self): @@ -31,12 +29,12 @@ def bar(self, a): @pytest.fixture def init_foo(): def init(key, a, evnt_done=None, evnt_play=None): - MetaLocal.local_initialize(foo) + Fixture.local_initialize(foo) foo.bar = a evnt_done and evnt_done.set() evnt_play and evnt_play.wait() results[key] = foo.bar - MetaLocal.local_delete(foo) + Fixture.local_delete(foo) return foo return init diff --git a/tests/test_form.py b/tests/test_form.py index 1ff0e948..56876c30 100644 --- a/tests/test_form.py +++ b/tests/test_form.py @@ -1,5 +1,4 @@ import io -import os import unittest import uuid diff --git a/tests/test_get_error_snapshot.py b/tests/test_get_error_snapshot.py index dd074453..9c6b9afc 100644 --- a/tests/test_get_error_snapshot.py +++ b/tests/test_get_error_snapshot.py @@ -1,4 +1,3 @@ -import os import unittest from py4web.core import get_error_snapshot diff --git a/tests/test_json.py b/tests/test_json.py index b5230409..ddc5b45b 100644 --- a/tests/test_json.py +++ b/tests/test_json.py @@ -10,7 +10,7 @@ class TestJson(unittest.TestCase): def test_objectify(self): """Check if we can serialize objects, generators, and dates""" - class A(object): + class A: def __init__(self, x): self.x = x diff --git a/tests/test_main.py b/tests/test_main.py index 09a812e1..61a61357 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -26,9 +26,6 @@ def run_cli(): class MainTest(unittest.TestCase): def test_main(self): - class MyException(Exception): - pass - def handler(signum, frame): raise KeyboardInterrupt diff --git a/tests/test_session.py b/tests/test_session.py index 9e05464e..9c16a21c 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -9,7 +9,7 @@ import pytest from py4web import DAL, Session, request, response -from py4web.core import MetaLocal +from py4web.core import Fixture from py4web.utils.dbstore import DBStore @@ -33,7 +33,7 @@ def request_context(session, context={}): session.on_success(context) finally: session.on_error(context) - MetaLocal.local_delete(session) + Fixture.local_delete(session) class TestSession(unittest.TestCase): @@ -44,7 +44,6 @@ def test_session(self): with request_context(session): session["key"] = "value" assert "key" in session.local.data - cookie_name = session.local.session_cookie_name a, b = str(response._cookies)[len("Set-Cookie: ") :].split(";")[0].split("=", 1) b = unquote(b) @@ -67,7 +66,6 @@ def test_session_as_attributes(self): with request_context(session): session.key = "value" assert "key" in session.local.data - cookie_name = session.local.session_cookie_name a, b = str(response._cookies)[len("Set-Cookie: ") :].split(";")[0].split("=", 1) b = unquote(b) diff --git a/tests/test_template.py b/tests/test_template.py index 18168f3b..ff996493 100644 --- a/tests/test_template.py +++ b/tests/test_template.py @@ -1,7 +1,7 @@ import os import unittest -from py4web.core import Template, request +from py4web.core import Template PATH = os.path.join(os.path.dirname(__file__), "templates") diff --git a/tests/test_url.py b/tests/test_url.py index e57b78e6..993bf848 100644 --- a/tests/test_url.py +++ b/tests/test_url.py @@ -1,5 +1,3 @@ -import random -import time import unittest from py4web import URL, request