diff --git a/rectools/metrics/intersection.py b/rectools/metrics/intersection.py index 369917f0..60cdaae4 100644 --- a/rectools/metrics/intersection.py +++ b/rectools/metrics/intersection.py @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, Hashable, Optional, Union +from collections.abc import Hashable +from typing import Dict, Optional, Union import attr import numpy as np diff --git a/rectools/models/implicit_als.py b/rectools/models/implicit_als.py index ae74b17b..a1fe8d1e 100644 --- a/rectools/models/implicit_als.py +++ b/rectools/models/implicit_als.py @@ -22,7 +22,7 @@ from implicit.cpu.als import AlternatingLeastSquares as CPUAlternatingLeastSquares from implicit.gpu.als import AlternatingLeastSquares as GPUAlternatingLeastSquares from implicit.utils import check_random_state -from pydantic import BeforeValidator, ConfigDict, PlainSerializer, SerializationInfo, WrapSerializer +from pydantic import BeforeValidator, ConfigDict, SerializationInfo, WrapSerializer from scipy import sparse from tqdm.auto import tqdm @@ -30,7 +30,7 @@ from rectools.exceptions import NotFittedError from rectools.models.base import ModelConfig from rectools.utils.misc import get_class_or_function_full_path, import_object -from rectools.utils.serialization import RandomState +from rectools.utils.serialization import DType, RandomState from .rank import Distance from .vector import Factors, VectorModel @@ -68,10 +68,6 @@ def _serialize_alternating_least_squares_class( ), ] -DType = tpe.Annotated[ - np.dtype, BeforeValidator(func=np.dtype), PlainSerializer(func=lambda dtp: dtp.name, when_used="json") -] - class AlternatingLeastSquaresConfig(tpe.TypedDict): """Config for implicit `AlternatingLeastSquares` model.""" diff --git a/rectools/models/implicit_bpr.py b/rectools/models/implicit_bpr.py new file mode 100644 index 00000000..eaf30d3c --- /dev/null +++ b/rectools/models/implicit_bpr.py @@ -0,0 +1,228 @@ +import typing as tp +from copy import deepcopy + +import numpy as np +import typing_extensions as tpe +from implicit.bpr import BayesianPersonalizedRanking +from implicit.cpu.bpr import ( + BayesianPersonalizedRanking as CPUBayesianPersonalizedRanking, # pylint: disable=no-name-in-module +) +from implicit.gpu.bpr import ( + BayesianPersonalizedRanking as GPUBayesianPersonalizedRanking, # pylint: disable=no-name-in-module +) +from pydantic import BeforeValidator, ConfigDict, SerializationInfo, WrapSerializer + +from rectools.dataset.dataset import Dataset +from rectools.exceptions import NotFittedError +from rectools.models.base import ModelConfig +from rectools.models.rank import Distance +from rectools.models.vector import Factors, VectorModel +from rectools.utils.misc import get_class_or_function_full_path, import_object +from rectools.utils.serialization import DType, RandomState + +BPR_STRING = "BayesianPersonalizedRanking" + +AnyBayesianPersonalizedRanking = tp.Union[CPUBayesianPersonalizedRanking, GPUBayesianPersonalizedRanking] +BayesianPersonalizedRankingType = tp.Union[ + tp.Type[AnyBayesianPersonalizedRanking], tp.Literal["BayesianPersonalizedRanking"] +] + + +def _get_bpr_class(spec: tp.Any) -> tp.Any: + if spec in (BPR_STRING, get_class_or_function_full_path(BayesianPersonalizedRanking)): + return "BayesianPersonalizedRanking" + if isinstance(spec, str): + return import_object(spec) + return spec + + +def _serialize_bpr_class( + cls: BayesianPersonalizedRankingType, handler: tp.Callable, info: SerializationInfo +) -> tp.Union[None, str, AnyBayesianPersonalizedRanking]: + if cls in (CPUBayesianPersonalizedRanking, GPUBayesianPersonalizedRanking) or cls == "BayesianPersonalizedRanking": + return BPR_STRING + if info.mode == "json": + return get_class_or_function_full_path(cls) + return cls + + +BayesianPersonalizedRankingClass = tpe.Annotated[ + BayesianPersonalizedRankingType, + BeforeValidator(_get_bpr_class), + WrapSerializer( + func=_serialize_bpr_class, + when_used="always", + ), +] + + +class BayesianPersonalizedRankingConfig(tpe.TypedDict): + """Config for implicit `BayesianPersonalizedRanking` model.""" + + cls: tpe.NotRequired[BayesianPersonalizedRankingClass] + factors: tpe.NotRequired[int] + learning_rate: tpe.NotRequired[float] + regularization: tpe.NotRequired[float] + dtype: tpe.NotRequired[DType] + num_threads: tpe.NotRequired[int] + iterations: tpe.NotRequired[int] + verify_negative_samples: tpe.NotRequired[bool] + random_state: tpe.NotRequired[tp.Union[RandomState, tp.Dict[str, tp.Any]]] + use_gpu: tpe.NotRequired[bool] + + +class ImplicitBPRWrapperModelConfig(ModelConfig): + """Config for `ImplicitBPRWrapperModel`""" + + model_config = ConfigDict(arbitrary_types_allowed=True) + + model: BayesianPersonalizedRankingConfig + + +class ImplicitBPRWrapperModel(VectorModel[ImplicitBPRWrapperModelConfig]): + """ + Wrapper for `implicit.bpr.BayesianPersonalizedRanking` model. + + See https://implicit.readthedocs.io/en/latest/bpr.html for details of the base model. + + Parameters + ---------- + model : BayesianPersonalizedRanking + Baes model to wrap. + verbose : int, default ``0`` + Degree of verbose output. If ``0``, no output will be provided. + """ + + recommends_for_warm = False + recommends_for_cold = False + + u2i_dist = Distance.DOT + i2i_dist = Distance.COSINE + + config_class = ImplicitBPRWrapperModelConfig + + def __init__(self, model: AnyBayesianPersonalizedRanking, verbose: int = 0): + self._config = self._make_config(model, verbose) + super().__init__(verbose=verbose) + self.model: AnyBayesianPersonalizedRanking + self._model = model # for refit + + self.use_gpu = isinstance(model, GPUBayesianPersonalizedRanking) + if not self.use_gpu: + self.n_threads = model.num_threads + + @classmethod + def _make_config(cls, model: AnyBayesianPersonalizedRanking, verbose: int) -> ImplicitBPRWrapperModelConfig: + model_cls = ( + model.__class__ + if model.__class__ not in (CPUBayesianPersonalizedRanking, GPUBayesianPersonalizedRanking) + else "BayesianPersonalizedRanking" + ) + random_state = model.random_state + if model.random_state and isinstance(model.random_state, np.random.RandomState): + random_state = random_state.get_state() + + inner_model_config = { + "cls": model_cls, + "factors": model.factors, + "learning_rate": model.learning_rate, + "regularization": model.regularization, + "iterations": model.iterations, + "verify_negative_samples": model.verify_negative_samples, + "random_state": random_state, + } + if isinstance(model, GPUBayesianPersonalizedRanking): + inner_model_config["use_gpu"] = True + else: + inner_model_config.update( + { + "use_gpu": False, + "dtype": model.dtype, + "num_threads": model.num_threads, + } + ) + + return ImplicitBPRWrapperModelConfig( + cls=cls, + model=tp.cast(BayesianPersonalizedRankingConfig, inner_model_config), + verbose=verbose, + ) + + def _get_config(self) -> ImplicitBPRWrapperModelConfig: + return self._config + + @classmethod + def _from_config(cls, config: ImplicitBPRWrapperModelConfig) -> tpe.Self: + inner_model_params = deepcopy(config.model) + inner_model_cls = inner_model_params.pop("cls", BayesianPersonalizedRanking) + inner_model_cls = tp.cast(tp.Callable, inner_model_cls) + if "random_state" in inner_model_params and isinstance(inner_model_params["random_state"], dict): + inner_model_params["random_state"] = np.random.set_state(inner_model_params["random_state"]) + if inner_model_cls == BPR_STRING: + inner_model_cls = BayesianPersonalizedRanking + model = inner_model_cls(**inner_model_params) + return cls(model=model, verbose=config.verbose) + + def _fit(self, dataset: Dataset) -> None: + self.model = deepcopy(self._model) + + ui_csr = dataset.get_user_item_matrix(include_weights=True).astype(np.float32) + self.model.fit(ui_csr, show_progress=self.verbose > 0) + + def _get_users_factors(self, dataset: Dataset) -> Factors: + return Factors(get_users_vectors(self.model)) + + def _get_items_factors(self, dataset: Dataset) -> Factors: + return Factors(get_items_vectors(self.model)) + + def get_vectors(self) -> tp.Tuple[np.ndarray, np.ndarray]: + """ + Return user and item vector representation from fitted model. + + Returns + ------- + (np.ndarray, np.ndarray) + User and item vectors. + Shapes are (n_users, n_factors) and (n_items, n_factors). + """ + if not self.is_fitted: + raise NotFittedError(self.__class__.__name__) + return get_users_vectors(self.model), get_items_vectors(self.model) + + +def get_users_vectors(model: AnyBayesianPersonalizedRanking) -> np.ndarray: + """ + Get user vectors from BPR model as a numpy array. + + Parameters + ---------- + model : BayesianPersonalizedRanking + Fitted BPR model. Can be CPU or GPU model + + Returns + ------- + np.ndarray + User vectors. + """ + if isinstance(model, GPUBayesianPersonalizedRanking): + return model.user_factors.to_numpy() + return model.user_factors + + +def get_items_vectors(model: AnyBayesianPersonalizedRanking) -> np.ndarray: + """ + Get item vectors from BPR model as a numpy array. + + Parameters + ---------- + model : BayesianPersonalizedRanking + Fitted BPR model. Can be CPU or GPU model + + Returns + ------- + np.ndarray + Item vectors. + """ + if isinstance(model, GPUBayesianPersonalizedRanking): + return model.item_factors.to_numpy() + return model.item_factors diff --git a/rectools/utils/serialization.py b/rectools/utils/serialization.py index 5447aa7a..67d72c85 100644 --- a/rectools/utils/serialization.py +++ b/rectools/utils/serialization.py @@ -17,7 +17,7 @@ import numpy as np import typing_extensions as tpe -from pydantic import PlainSerializer +from pydantic import BeforeValidator, PlainSerializer FileLike = tp.Union[str, Path, tp.IO[bytes]] @@ -37,6 +37,10 @@ def _serialize_random_state(rs: tp.Optional[tp.Union[None, int, np.random.Random PlainSerializer(func=_serialize_random_state, when_used="json"), ] +DType = tpe.Annotated[ + np.dtype, BeforeValidator(func=np.dtype), PlainSerializer(func=lambda dtp: dtp.name, when_used="json") +] + def read_bytes(f: FileLike) -> bytes: """Read bytes from a file.""" diff --git a/tests/models/test_implicit_bpr.py b/tests/models/test_implicit_bpr.py new file mode 100644 index 00000000..ac25154d --- /dev/null +++ b/tests/models/test_implicit_bpr.py @@ -0,0 +1,314 @@ +import typing as tp +from copy import deepcopy + +import implicit.gpu +import numpy as np +import pandas as pd +import pytest +from implicit.bpr import BayesianPersonalizedRanking +from implicit.cpu.bpr import ( + BayesianPersonalizedRanking as CPUBayesianPersonalizedRanking, # pylint: disable=no-name-in-module +) +from implicit.gpu import HAS_CUDA +from implicit.gpu.bpr import ( + BayesianPersonalizedRanking as GPUBayesianPersonalizedRanking, # pylint: disable=no-name-in-module +) + +from rectools.columns import Columns +from rectools.dataset.dataset import Dataset +from rectools.exceptions import NotFittedError +from rectools.models.implicit_bpr import AnyBayesianPersonalizedRanking, ImplicitBPRWrapperModel +from rectools.models.utils import recommend_from_scores +from tests.models.data import DATASET + + +@pytest.mark.parametrize("use_gpu", (False, True) if HAS_CUDA else (False,)) +class TestImplicitBPRWrapperModel: + # Tries to make BPR model deterministic + @staticmethod + def _init_model_factors_inplace(model: AnyBayesianPersonalizedRanking, dataset: Dataset) -> None: + n_factors = model.factors + n_users = dataset.user_id_map.to_internal.size + n_items = dataset.item_id_map.to_internal.size + user_factors: np.ndarray = np.linspace(0.1, 0.5, n_users * n_factors, dtype=np.float32).reshape(n_users, -1) + item_factors: np.ndarray = np.linspace(0.1, 0.5, n_items * n_factors, dtype=np.float32).reshape(n_items, -1) + + if isinstance(model, GPUBayesianPersonalizedRanking): + user_factors = implicit.gpu.Matrix(user_factors) + item_factors = implicit.gpu.Matrix(item_factors) + + model.user_factors = user_factors + model.item_factors = item_factors + + @pytest.fixture + def dataset(self) -> Dataset: + return DATASET + + @pytest.mark.parametrize( + "filter_viewed,expected_cpu,expected_gpu", + ( + ( + True, + pd.DataFrame( + { + Columns.User: [10, 10, 20, 20], + Columns.Item: [17, 15, 17, 15], + Columns.Rank: [1, 2, 1, 2], + } + ), + pd.DataFrame( + { + Columns.User: [10, 10, 20, 20], + Columns.Item: [17, 15, 17, 15], + Columns.Rank: [1, 2, 1, 2], + } + ), + ), + ( + False, + pd.DataFrame( + { + Columns.User: [10, 10, 20, 20], + Columns.Item: [11, 12, 11, 17], + Columns.Rank: [1, 2, 1, 2], + } + ), + pd.DataFrame( + { + Columns.User: [10, 10, 20, 20], + Columns.Item: [17, 15, 17, 15], + Columns.Rank: [1, 2, 1, 2], + } + ), + ), + ), + ) + def test_basic( + self, + dataset: Dataset, + filter_viewed: bool, + expected_cpu: pd.DataFrame, + expected_gpu: pd.DataFrame, + use_gpu: bool, + ) -> None: + base_model = BayesianPersonalizedRanking( + factors=2, num_threads=2, iterations=100, use_gpu=use_gpu, random_state=1 + ) + self._init_model_factors_inplace(base_model, dataset) + model = ImplicitBPRWrapperModel(model=base_model).fit(dataset) + actual = model.recommend( + users=np.array([10, 20]), + dataset=dataset, + k=2, + filter_viewed=filter_viewed, + ) + expected = expected_gpu if use_gpu else expected_cpu + pd.testing.assert_frame_equal(actual.drop(columns=Columns.Score), expected) + pd.testing.assert_frame_equal( + actual.sort_values([Columns.User, Columns.Score], ascending=[True, False]).reset_index(drop=True), + actual, + ) + + def test_consistent_with_pure_implicit(self, dataset: Dataset, use_gpu: bool) -> None: + base_model = BayesianPersonalizedRanking( + factors=2, num_threads=2, iterations=100, use_gpu=use_gpu, random_state=42 + ) + self._init_model_factors_inplace(base_model, dataset) + users = np.array([10, 20, 30, 40]) + + model_for_wrap = deepcopy(base_model) + wrapper_model = ImplicitBPRWrapperModel(model=model_for_wrap).fit(dataset) + actual_reco = wrapper_model.recommend(users=users, dataset=dataset, k=3, filter_viewed=False) + + ui_csr = dataset.get_user_item_matrix(include_weights=True) + base_model.fit(ui_csr) + for user_id in users: + internal_id = dataset.user_id_map.convert_to_internal([user_id])[0] + expected_ids, expected_scores = base_model.recommend( + userid=internal_id, + user_items=ui_csr[internal_id], + N=3, + filter_already_liked_items=False, + ) + actual_ids = actual_reco.loc[actual_reco[Columns.User] == user_id, Columns.Item].values + actual_internal_ids = dataset.item_id_map.convert_to_internal(actual_ids) + actual_scores = actual_reco.loc[actual_reco[Columns.User] == user_id, Columns.Score].values + np.testing.assert_equal(actual_internal_ids, expected_ids) + np.testing.assert_allclose(actual_scores, expected_scores, atol=0.03) + + @pytest.mark.parametrize( + "filter_viewed,expected", + ( + ( + True, + {10: {13, 17}, 20: {17}}, + ), + ( + False, + {10: {11, 13, 17}, 20: {11, 13, 17}}, + ), + ), + ) + def test_with_allowlist( + self, + dataset: Dataset, + filter_viewed: bool, + expected: tp.Dict[int, tp.Set[int]], + use_gpu: bool, + ) -> None: + base_model = BayesianPersonalizedRanking( + factors=32, num_threads=2, iterations=100, use_gpu=use_gpu, random_state=42 + ) + model = ImplicitBPRWrapperModel(model=base_model).fit(dataset) + actual = model.recommend( + users=np.array([10, 20]), + dataset=dataset, + k=3, + filter_viewed=filter_viewed, + items_to_recommend=np.array([11, 13, 17]), + ) + for uid in (10, 20): + assert set(actual.loc[actual[Columns.User] == uid, Columns.Item]) == expected[uid] + + @pytest.mark.parametrize( + "filter_itself,allowlist,expected", + ( + ( + False, + None, + pd.DataFrame( + { + Columns.TargetItem: [11, 11, 12, 12], + Columns.Item: [11, 12, 12, 11], + Columns.Rank: [1, 2, 1, 2], + } + ), + ), + ( + True, + None, + pd.DataFrame( + { + Columns.TargetItem: [11, 11, 12, 12], + Columns.Item: [12, 14, 11, 14], + Columns.Rank: [1, 2, 1, 2], + } + ), + ), + ( + False, + np.array([11, 15, 14]), + pd.DataFrame( + { + Columns.TargetItem: [11, 11, 12, 12], + Columns.Item: [11, 14, 11, 14], + Columns.Rank: [1, 2, 1, 2], + } + ), + ), + ), + ) + def test_i2i( + self, + dataset: Dataset, + filter_itself: bool, + allowlist: tp.Optional[np.ndarray], + expected: pd.DataFrame, + use_gpu: bool, + ) -> None: + base_model = BayesianPersonalizedRanking( + factors=2, num_threads=2, iterations=100, use_gpu=use_gpu, random_state=1 + ) + self._init_model_factors_inplace(base_model, dataset) + model = ImplicitBPRWrapperModel(model=base_model).fit(dataset) + actual = model.recommend_to_items( + target_items=np.array([11, 12]), + dataset=dataset, + k=2, + filter_itself=filter_itself, + items_to_recommend=allowlist, + ) + pd.testing.assert_frame_equal(actual.drop(columns=Columns.Score), expected) + pd.testing.assert_frame_equal( + actual.sort_values([Columns.TargetItem, Columns.Rank], ascending=[True, True]).reset_index(drop=True), + actual, + ) + + def test_get_vectors(self, dataset: Dataset, use_gpu: bool) -> None: + base_model = BayesianPersonalizedRanking(use_gpu=use_gpu) + model = ImplicitBPRWrapperModel(model=base_model).fit(dataset) + users_embeddings, item_embeddings = model.get_vectors() + predictions = users_embeddings @ item_embeddings.T + vectors_predictions = [recommend_from_scores(predictions[i], k=5) for i in range(4)] + vectors_reco = np.array([vp[0] for vp in vectors_predictions]).ravel() + vectors_scores = np.array([vp[1] for vp in vectors_predictions]).ravel() + _, reco_item_ids, reco_scores = model._recommend_u2i( # pylint: disable=protected-access + user_ids=dataset.user_id_map.convert_to_internal(np.array([10, 20, 30, 40])), + dataset=dataset, + k=5, + filter_viewed=False, + sorted_item_ids_to_recommend=None, + ) + np.testing.assert_equal(vectors_reco, reco_item_ids) + np.testing.assert_almost_equal(vectors_scores, reco_scores, decimal=5) + + def test_raises_when_get_vectors_from_not_fitted(self, use_gpu: bool) -> None: + model = ImplicitBPRWrapperModel(model=BayesianPersonalizedRanking(use_gpu=use_gpu)) + with pytest.raises(NotFittedError): + model.get_vectors() + + def test_u2i_with_cold_users(self, use_gpu: bool, dataset: Dataset) -> None: + base_model = BayesianPersonalizedRanking(use_gpu=use_gpu) + model = ImplicitBPRWrapperModel(model=base_model).fit(dataset) + with pytest.raises(ValueError, match="doesn't support recommendations for cold users"): + model.recommend( + users=[10, 20, 50], + dataset=dataset, + k=2, + filter_viewed=False, + ) + + def test_i2i_with_warm_and_cold_items(self, use_gpu: bool, dataset: Dataset) -> None: + base_model = BayesianPersonalizedRanking(use_gpu=use_gpu) + model = ImplicitBPRWrapperModel(model=base_model).fit(dataset) + with pytest.raises(ValueError, match="doesn't support recommendations for cold items"): + model.recommend_to_items( + target_items=[11, 12, 16], + dataset=dataset, + k=2, + ) + + +class TestImplicitBPRWrapperModelConfiguration: + def setup_method(self) -> None: + implicit.gpu.HAS_CUDA = True + + @pytest.mark.parametrize("use_gpu", (False, True)) + @pytest.mark.parametrize("cls", (None, "BayesianPersonalizedRanking", "implicit.bpr.BayesianPersonalizedRanking")) + def test_from_config(self, use_gpu: bool, cls: tp.Any) -> None: + config: tp.Dict = { + "model": { + "factors": 10, + "learning_rate": 0.01, + "regularization": 0.01, + "iterations": 100, + "num_threads": 2, + "verify_negative_samples": False, + "use_gpu": use_gpu, + }, + "verbose": 1, + } + if cls is not None: + config["model"]["cls"] = cls + model = ImplicitBPRWrapperModel.from_config(config) + assert model.verbose == 1 + inner_model = model._model # pylint: disable=protected-access + assert inner_model.factors == 10 + assert inner_model.learning_rate == 0.01 + assert inner_model.regularization == 0.01 + assert inner_model.iterations == 100 + assert inner_model.verify_negative_samples is False + if not use_gpu: + assert inner_model.num_threads == 2 + expected_model_class = GPUBayesianPersonalizedRanking if use_gpu else CPUBayesianPersonalizedRanking + assert isinstance(inner_model, expected_model_class) diff --git a/tests/tools/test_ann.py b/tests/tools/test_ann.py index ff430639..a53156a8 100644 --- a/tests/tools/test_ann.py +++ b/tests/tools/test_ann.py @@ -13,7 +13,8 @@ # limitations under the License. import pickle -from typing import Callable, Dict, Hashable, List, Union +from collections.abc import Hashable +from typing import Callable, Dict, List, Union import numpy as np import pytest