-
Notifications
You must be signed in to change notification settings - Fork 0
/
matfac.py
183 lines (136 loc) · 7.02 KB
/
matfac.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
"""
Weighted Matrix Factorization class to factorize a ratings matrix into two
matrices, representing the users embeddings and the items embeddings.
The optimization is performed using WALS (Weighted Alternating Least Squares).
Author(s): Enrico Stefanel ([email protected])
Date: 2024-01-18
"""
import logging
import numpy as np
from typing import Tuple
from numpy.linalg import solve
from tqdm import tqdm
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class WeightedMatrixFactorization():
"""
Weighted Matrix Factorization class to factorize a ratings matrix into two
matrices, representing the users embeddings and the items embeddings.
"""
def __init__(self, ratings, weight_observed:float=1.0, weight_unobserved:float=0.1, num_factors:int=100, lambda_reg:float=0.05, num_iterations:int=10) -> None:
"""
Initialize the weighted matrix factorization model.
Input(s): ratings: The ratings matrix. It must be a numpy array.
weight_observed: Weight for observed ratings. Default: 1.0
weight_unobserved: Weight for unobserved ratings. Default: 0.1
num_factors: Number of factors. Default: 100
lambda_reg: Regularization term. Default: 0.05
num_iterations: Number of iterations. Default: 10
Output(s): None
"""
self.ratings = np.nan_to_num(np.array(ratings),0) # Replace NaN values with 0
self.observed_data = ~np.isnan(ratings) # Create a boolean matrix where True values are observed ratings, False values are unobserved ratings
self.num_users, self.num_items = self.ratings.shape # Get the number of users and items
self.weight_observed = weight_observed # Weight for observed ratings
self.weight_unobserved = weight_unobserved # Weight for unobserved ratings
self.num_factors = num_factors # Number of factors
self.lambda_reg = lambda_reg # Regularization term
self.num_iterations = num_iterations # Number of iterations for the fitting process
self.is_fitted = False # Flag to check if the model has been fitted
# Initialize user and item matrices with random values
self.user_matrix = np.random.rand(self.num_users, self.num_factors)
self.item_matrix = np.random.rand(self.num_items, self.num_factors)
def get_embeddings(self) -> Tuple[np.ndarray, np.ndarray]:
"""
Get the user and item embeddings.
Input(s): None
Output(s): - user_matrix: The user embeddings matrix.
- item_matrix: The item embeddings matrix.
"""
if not self.is_fitted:
raise ValueError("The ratings matrix has not been factorized yet. Please call the fit() method first.")
return self.user_matrix, self.item_matrix
def fit(self, method:str="WALS", **kwargs) -> None:
"""
Train the weighted matrix factorization model using one of the
implemented methods.
Input(s): - method: Method to use for training. Default: WALS
(Weighted Alternating Least Squares)
- **kwargs: Keyword arguments for the training method.
Output(s): - None
"""
# Check which method to use for the matrix factorization
if method == "WALS":
# Train the model using WALS (Weighted Alternating Least Squares)
self.__fit_wals(**kwargs)
self.is_fitted = True # Set the flag to True to indicate that the model has been fitted
return
else:
# Raise an error if the method is not supported
raise NotImplementedError(f"Method {method} not supported. Please choose one of the followings: 'WALS'.")
def __fit_wals(self) -> Tuple[np.ndarray, np.ndarray]:
"""
Train the weighted matrix factorization model using WALS (Weighted Alternating Least Squares).
Input(s): None
Output(s): None
"""
for iteration in tqdm(range(self.num_iterations)):
self.__update_users_matrix()
self.__update_items_matrix()
# Calculate the loss (the difference between the observed ratings
# and the dot product of the user and item vectors)
loss = np.sum(
np.where(
self.observed_data,
(self.ratings - self.user_matrix @ self.item_matrix.T) ** 2,
0
)
)
logger.debug(f"Iteration: {iteration + 1} -> Loss: {loss}")
return self.user_matrix, self.item_matrix
def __update_users_matrix(self) -> None:
"""
Update the users matrix using WALS (Weighted Alternating Least Squares).
Input(s): None
Output(s): None
"""
for user_idx in range(self.num_users):
# Weight matrix for observed and unobserved values
weight_matrix = np.diag(
np.where(
self.observed_data[user_idx, :],
self.weight_observed / sum(self.observed_data[user_idx, :]), # Normalize the weight for observed ratings
self.weight_unobserved / sum(~self.observed_data[user_idx, :]) # Normalize the weight for unobserved ratings
)
)
# Regularization term
regularization = self.lambda_reg * np.eye(self.num_factors)
# Solve the system of linear equations
self.user_matrix[user_idx,:] = solve(
self.item_matrix.T @ weight_matrix @ self.item_matrix + regularization,
self.item_matrix.T @ weight_matrix @ self.ratings[user_idx, :]
)
return
def __update_items_matrix(self) -> None:
"""
Update the items matrix using WALS (Weighted Alternating Least Squares).
Input(s): None
Output(s): None
"""
for item_idx in range(self.num_items):
# Weight matrix for observed and unobserved values
weight_matrix = np.diag(
np.where(
self.observed_data[:,item_idx],
self.weight_observed / sum(self.observed_data[:, item_idx]) , # Normalize the weight for observed ratings
self.weight_unobserved / sum(~self.observed_data[:, item_idx]) # Normalize the weight for unobserved ratings
)
)
# Regularization term
regularization = self.lambda_reg * np.eye(self.num_factors)
# Solve the system of linear equations using spsolve
self.item_matrix[item_idx, :] = solve(self.user_matrix.T @ weight_matrix @ self.user_matrix + regularization,
self.user_matrix.T @ weight_matrix @ self.ratings[:, item_idx])
return