From db9956f68f5006385ade412273a33179779115bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=82=85=E7=AB=8B=E4=B8=9A=EF=BC=88Chris=20Fu=EF=BC=89?= <17433201@qq.com> Date: Thu, 25 Jul 2024 14:57:54 +0800 Subject: [PATCH] Bump version to 0.1.5 with an overhaul - `V` in `KeyPath[V]` has been restored as covariant. - Original `KeyPath.set` has been renamed to `KeyPath.unsafe_set` due to variance, and `KeyPath.set` itself is a deprecated method now. - Code has been reordered for better readability. - Tests have been reorganized. --- pyproject.toml | 3 +- runtime_keypath/_core.py | 186 +++++++++++--------- runtime_keypath/_core_test.py | 312 +++++++++++++++++++--------------- 3 files changed, 276 insertions(+), 225 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 153757c..df93866 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "runtime-keypath" -version = "0.1.4" +version = "0.1.5" authors = [{ name = "Chris Fu", email = "17433201@qq.com" }] description = "Supports runtime key-path recording/accessing for Python." classifiers = [ @@ -17,6 +17,7 @@ classifiers = [ readme = "README.md" license = { file = "LICENSE" } requires-python = ">=3.8" +dependencies = ["typing_extensions >= 4.9"] [project.urls] Homepage = "https://github.com/Azureblade3808/py-runtime-keypath" diff --git a/runtime_keypath/_core.py b/runtime_keypath/_core.py index 609f91b..6fe52f4 100644 --- a/runtime_keypath/_core.py +++ b/runtime_keypath/_core.py @@ -1,6 +1,3 @@ -# pyright: reportImplicitOverride=false - - from __future__ import annotations __all__ = [ @@ -8,48 +5,78 @@ "KeyPathSupporting", ] -# region[Keywords] - -from typing import TYPE_CHECKING, Final, Generic, Protocol, TypeVar, cast, final +import threading +from collections.abc import Sequence -# endregion[Keywords] +from typing_extensions import ( + TYPE_CHECKING, + Any, + Final, + Generic, + Protocol, + TypeVar, + cast, + deprecated, + final, + override, +) -# region[Types] +_Value_co = TypeVar("_Value_co", covariant=True) +_Value_0 = TypeVar("_Value_0") -if TYPE_CHECKING: - from typing import Any, Sequence +_MISSING = cast("Any", object()) -# endregion[Types] -import threading -from dataclasses import dataclass, field -from typing import NamedTuple +class KeyPathSupporting: + """ + A base class that indicates an object can be used as a chain in + `KeyPath.of(...)` call. + """ -_Value_t = TypeVar("_Value_t") + # ! This method is intentially not named as `__getattribute__`. See below for + # ! reason. + def _(self, key: str, /) -> Any: + try: + recorder = _thread_local.recorder + except AttributeError: + # There is no recorder, which means that `KeyPath.of` is not being called. + # So we don't need to record this key. + return super().__getattribute__(key) + if recorder.busy: + # The recorder is busy, which means that another member is being accessed, + # typically because the computation of that member is dependent on this one. + # So we don't need to record this key. + return super().__getattribute__(key) -class _ThreadLocalProtocol(Protocol): - recorder: _KeyPathRecorder - """ - The active key-path recorder for this thread. May not exist. - """ + recorder.busy = True + if recorder.start is not _MISSING and recorder.end is not self: + raise RuntimeError( + " ".join( + [ + "Key-path is broken. Check if there is something that does NOT", + "support key-paths in the member chain.", + ] + ) + ) -_thread_local = cast("_ThreadLocalProtocol", threading.local()) + value = super().__getattribute__(key) + recorder.busy = False + if recorder.start is _MISSING: + recorder.start = self + recorder.end = value + recorder.key_list.append(key) -@final -class _Terminals(NamedTuple): - start: Any - end: Any + return value + # ! `__getattribute__(...)` is declared against `TYPE_CHECKING`, so that unknown + # ! attributes on conforming classes won't be regarded as known by type-checkers. + if not TYPE_CHECKING: + __getattribute__ = _ -@final -@dataclass -class _KeyPathRecorder: - terminals: _Terminals | None = None - key_list: list[str] = field(default_factory=list) - busy: bool = False + del _ # ! A metaclass is made for class `KeyPath`, and `KeyPath.of` is provided as a property @@ -59,6 +86,7 @@ class _KeyPathRecorder: class _KeyPathMeta(type): @property def of(self, /) -> _KeyPathOfFunction: + # ! Docstring here is for Pylance hint. """ Returns the key-path for accessing a certain value from a target object with a key sequence such as `a.b.c`. @@ -128,6 +156,7 @@ def of(self, /) -> _KeyPathOfFunction: # ! exception occurred during the key-path access, there would still be a chance to # ! perform some finalization. class _KeyPathOfFunction: + # ! Docstring here is for runtime help. """ Returns the key-path for accessing a certain value from a target object with a key sequence such as `a.b.c`. @@ -174,7 +203,7 @@ class _KeyPathOfFunction: __invoked: bool = False - def __call__(self, value: _Value_t, /) -> KeyPath[_Value_t]: + def __call__(self, value: _Value_0, /) -> KeyPath[_Value_0]: self.__invoked = True try: @@ -193,16 +222,16 @@ def __call__(self, value: _Value_t, /) -> KeyPath[_Value_t]: assert not recorder.busy - terminals = recorder.terminals + start = recorder.start key_list = recorder.key_list - if terminals is None: + if start is _MISSING: assert len(key_list) == 0 raise RuntimeError("No key has been recorded.") else: assert len(key_list) > 0 - if terminals.end is not value: + if recorder.end is not value: raise RuntimeError( " ".join( [ @@ -212,7 +241,7 @@ def __call__(self, value: _Value_t, /) -> KeyPath[_Value_t]: ) ) - key_path = KeyPath(terminals.start, key_list) + key_path = KeyPath(start, key_list) return key_path def __del__(self, /) -> None: @@ -224,7 +253,11 @@ def __del__(self, /) -> None: @final -class KeyPath(Generic[_Value_t], metaclass=_KeyPathMeta): +class KeyPath(Generic[_Value_co], metaclass=_KeyPathMeta): + """ + An object that stands for a member chain from a base object. + """ + __target: Final[Any] __keys: Final[Sequence[str]] @@ -245,23 +278,29 @@ def target(self, /) -> Any: def keys(self, /) -> Sequence[str]: return self.__keys - def get(self, /) -> _Value_t: + def get(self, /) -> _Value_co: value = self.__target for key in self.__keys: value = getattr(value, key) return value - def set(self, value: _Value_t, /) -> None: + def unsafe_set(self: KeyPath[_Value_0], value: _Value_0, /) -> None: target = self.__target keys = self.__keys - n = len(keys) - 1 - for i in range(n): + i_last_key = len(keys) - 1 + for i in range(i_last_key): target = getattr(target, keys[i]) - setattr(target, keys[n], value) + setattr(target, keys[i_last_key], value) + @deprecated("`KeyPath.set` is deprecated. Use `KeyPath.unsafe_set` instead.") + def set(self: KeyPath[_Value_0], value: _Value_0, /) -> None: + return self.unsafe_set(value) + + @override def __hash__(self, /) -> int: return hash((self.target, self.keys)) + @override def __eq__(self, other: object, /) -> bool: return ( isinstance(other, KeyPath) @@ -269,58 +308,35 @@ def __eq__(self, other: object, /) -> bool: and self.keys == other.keys ) + @override def __repr__(self, /) -> str: return f"{KeyPath.__name__}(target={self.target!r}, keys={self.keys!r})" - def __call__(self, /) -> _Value_t: + def __call__(self, /) -> _Value_co: return self.get() -class KeyPathSupporting: - # ! This method is purposedly not named as `__getattribute__`. See below for reason. - def __getattribute_0__(self, key: str, /) -> Any: - try: - recorder = _thread_local.recorder - except AttributeError: - # There is no recorder, which means that `KeyPath.of` is not being called. - # So we don't need to record this key. - return super().__getattribute__(key) - - if recorder.busy: - # The recorder is busy, which means that another member is being accessed, - # typically because the computation of that member is dependent on this one. - # So we don't need to record this key. - return super().__getattribute__(key) - - recorder.busy = True - - terminals = recorder.terminals - if terminals is not None and terminals.end is not self: - raise RuntimeError( - " ".join( - [ - "Key-path is broken. Check if there is something that does NOT", - "support key-paths in the member chain.", - ] - ) - ) +class _ThreadLocalProtocol(Protocol): + recorder: _KeyPathRecorder + """ + The active key-path recorder for this thread. May not exist. + """ - value = super().__getattribute__(key) - if terminals is None: - terminals = _Terminals(self, value) - else: - terminals = terminals._replace(end=value) +_thread_local = cast("_ThreadLocalProtocol", threading.local()) - recorder.terminals = terminals - recorder.key_list.append(key) - recorder.busy = False - return value +@final +class _KeyPathRecorder: + __slots__ = ("busy", "start", "end", "key_list") - # ! `__getattribute__(...)` is declared against `TYPE_CHECKING`, so that unknown - # ! attributes on conforming classes won't be regarded as known by type-checkers. - if not TYPE_CHECKING: - __getattribute__ = __getattribute_0__ + busy: bool + start: Any + end: Any + key_list: list[str] - del __getattribute_0__ + def __init__(self, /) -> None: + self.busy = False + self.start = _MISSING + self.end = _MISSING + self.key_list = [] diff --git a/runtime_keypath/_core_test.py b/runtime_keypath/_core_test.py index 13bee5b..fb65b01 100644 --- a/runtime_keypath/_core_test.py +++ b/runtime_keypath/_core_test.py @@ -2,203 +2,237 @@ import time from threading import Thread +from typing import Any, cast import pytest from ._core import * -def test_normal() -> None: - class A(KeyPathSupporting): - b: B +class Tests: + @staticmethod + def test__normal() -> None: + class A(KeyPathSupporting): + b: B - def __init__(self) -> None: - self.b = B() + def __init__(self) -> None: + self.b = B() - class B(KeyPathSupporting): - c: int + class B(KeyPathSupporting): + c: int - def __init__(self) -> None: - self.c = 0 + def __init__(self) -> None: + self.c = 0 - a = A() - key_path = KeyPath.of(a.b.c) - assert key_path == KeyPath(target=a, keys=("b", "c")) - assert key_path() == 0 + a = A() + key_path = KeyPath.of(a.b.c) + assert key_path == KeyPath(target=a, keys=("b", "c")) + assert key_path() == 0 - a.b.c = 1 - assert key_path() == 1 + a.b.c = 1 + assert key_path() == 1 + @staticmethod + def test__cycle_reference() -> None: + class A(KeyPathSupporting): + a: A + b: B -def test_cycle_reference() -> None: - class A(KeyPathSupporting): - a: A - b: B + def __init__(self) -> None: + self.a = self + self.b = B() - def __init__(self) -> None: - self.a = self - self.b = B() + class B(KeyPathSupporting): + b: B + c: C - class B(KeyPathSupporting): - b: B - c: C + def __init__(self) -> None: + self.b = self + self.c = C() - def __init__(self) -> None: - self.b = self - self.c = C() + class C: + pass - class C: - pass + a = A() + assert KeyPath.of(a.a.b.b.c) == KeyPath(target=a, keys=("a", "b", "b", "c")) - a = A() - assert KeyPath.of(a.a.b.b.c) == KeyPath(target=a, keys=("a", "b", "b", "c")) + @staticmethod + def test__common_mistakes() -> None: + class A(KeyPathSupporting): + b: B + def __init__(self) -> None: + self.b = B() -def test_common_mistakes() -> None: - class A(KeyPathSupporting): - b: B + class B(KeyPathSupporting): + c: C - def __init__(self) -> None: - self.b = B() + def __init__(self) -> None: + self.c = C() - class B(KeyPathSupporting): - c: C + class C: + pass - def __init__(self) -> None: - self.c = C() + a = A() - class C: - pass + with pytest.raises(Exception): + # Not even accessed a single member. + _ = KeyPath.of(a) - a = A() + with pytest.raises(Exception): + # Using something that is not a member chain. + _ = KeyPath.of(id(a.b.c)) - with pytest.raises(Exception): - # Not even accessed a single member. - _ = KeyPath.of(a) + with pytest.raises(Exception): + # Calling the same `KeyPath.of` more than once. + of = KeyPath.of + _ = of(a.b.c) + _ = of(a.b.c) - with pytest.raises(Exception): - # Using something that is not a member chain. - _ = KeyPath.of(id(a.b.c)) + @staticmethod + def test__error_handling() -> None: + class A(KeyPathSupporting): + b: B - with pytest.raises(Exception): - # Calling the same `KeyPath.of` more than once. - of = KeyPath.of - _ = of(a.b.c) - _ = of(a.b.c) + def __init__(self) -> None: + self.b = B() + class B(KeyPathSupporting): + c: C -def test_error_handling() -> None: - class A(KeyPathSupporting): - b: B + def __init__(self) -> None: + self.c = C() - def __init__(self) -> None: - self.b = B() + class C: + pass - class B(KeyPathSupporting): - c: C + a = A() - def __init__(self) -> None: - self.c = C() + with pytest.raises(AttributeError): + # Accessing something that doesn't exist. + _ = KeyPath.of(a.b.c.d) # type: ignore - class C: - pass + # With above exception caught, normal code should run correctly. + key_path = KeyPath.of(a.b.c) + assert key_path == KeyPath(target=a, keys=("b", "c")) - a = A() + @staticmethod + def test__threading() -> None: + class A(KeyPathSupporting): + b: B - with pytest.raises(AttributeError): - # Accessing something that doesn't exist. - _ = KeyPath.of(a.b.c.d) # type: ignore + def __init__(self) -> None: + self.b = B() - # * With above exception caught, normal code should run correctly. - key_path = KeyPath.of(a.b.c) - assert key_path == KeyPath(target=a, keys=("b", "c")) + class B(KeyPathSupporting): + c: C + def __init__(self) -> None: + self.c = C() -def test_threading() -> None: - class A(KeyPathSupporting): - b: B + class C: + pass - def __init__(self) -> None: - self.b = B() + a = A() + key_path_list: list[KeyPath] = [] - class B(KeyPathSupporting): - c: C + def f() -> None: + # Sleeping for a short while so that the influence of starting a thread + # could be minimal. + time.sleep(1) - def __init__(self) -> None: - self.c = C() + key_path = KeyPath.of(a.b.c) + key_path_list.append(key_path) - class C: - pass + threads = [Thread(target=f) for _ in range(1000)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() - a = A() - key_path_list: list[KeyPath] = [] + assert len(key_path_list) == 1000 + assert all( + key_path == KeyPath(target=a, keys=("b", "c")) for key_path in key_path_list + ) - def f() -> None: - # Sleeping for a short while so that the influence of starting a thread could be - # minimal. - time.sleep(1) + @staticmethod + def test__internal_reference() -> None: + class C(KeyPathSupporting): + @property + def v0(self) -> int: + return self.v1.v2 - key_path = KeyPath.of(a.b.c) - key_path_list.append(key_path) + @property + def v1(self) -> C: + return self + + @property + def v2(self) -> int: + return 0 + + c = C() + assert KeyPath.of(c.v0) == KeyPath(target=c, keys=("v0",)) - threads = [Thread(target=f) for _ in range(1000)] - for thread in threads: - thread.start() - for thread in threads: - thread.join() + @staticmethod + def test__get() -> None: + MISSING = cast(Any, object()) - assert len(key_path_list) == 1000 - assert all( - key_path == KeyPath(target=a, keys=("b", "c")) for key_path in key_path_list - ) + class A(KeyPathSupporting): + b: B = MISSING + class B(KeyPathSupporting): + c: C = MISSING -def test_internal_reference() -> None: - class C(KeyPathSupporting): - @property - def v0(self) -> int: - return self.v1.v2 + class C(KeyPathSupporting): + v: int = MISSING - @property - def v1(self) -> C: - return self + a = A() + b = B() + c = C() - @property - def v2(self) -> int: - return 0 + key_path_0 = KeyPath.of(a.b) + assert key_path_0.get() is MISSING + a.b = b + assert key_path_0.get() is b - c = C() - assert KeyPath.of(c.v0) == KeyPath(target=c, keys=("v0",)) + key_path_1 = KeyPath.of(a.b.c) + assert key_path_1.get() is MISSING + a.b.c = c + assert key_path_1.get() is c + key_path_2 = KeyPath.of(a.b.c.v) + assert key_path_2.get() is MISSING + a.b.c.v = 12345 + assert key_path_2.get() == 12345 -def test_get_set() -> None: - class A(KeyPathSupporting): - b: B | None = None + @staticmethod + def test__unsafe_set() -> None: + MISSING = cast(Any, object()) - class B(KeyPathSupporting): - c: C | None = None + class A(KeyPathSupporting): + b: B = MISSING - class C(KeyPathSupporting): - v: int | None = None + class B(KeyPathSupporting): + c: C = MISSING - a = A() - b = B() - c = C() + class C(KeyPathSupporting): + v: int = MISSING - key_path_0 = KeyPath.of(a.b) - assert key_path_0.get() is None - key_path_0.set(b) - assert a.b is b - assert key_path_0.get() is b + a = A() + b = B() + c = C() - key_path_1 = KeyPath.of(a.b.c) # type: ignore - assert key_path_1.get() is None - key_path_1.set(c) - assert a.b.c is c # type: ignore - assert key_path_1.get() is c + assert a.b is MISSING + key_path_0 = KeyPath.of(a.b) + key_path_0.unsafe_set(b) + assert a.b is b - key_path_2 = KeyPath.of(a.b.c.v) # type: ignore - assert key_path_2.get() is None - key_path_2.set(12345) - assert a.b.c.v == 12345 # type: ignore - assert key_path_2.get() == 12345 + assert a.b.c is MISSING + key_path_1 = KeyPath.of(a.b.c) + key_path_1.unsafe_set(c) + assert a.b.c is c + + assert a.b.c.v is MISSING + key_path_2 = KeyPath.of(a.b.c.v) + key_path_2.unsafe_set(12345) + assert a.b.c.v == 12345