-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrec_engine.py
98 lines (72 loc) · 3.09 KB
/
rec_engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.sql.types import *
from pyspark.sql import functions as F
# sc = SparkSession.builder.master("local").appName("Recommender").getOrCreate()
# spark = sc.sparkContext
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
plays_df_schema = StructType([
StructField('userId', StringType()),
StructField('songId', StringType()),
StructField('Plays', IntegerType())
])
metadata_df_schema = StructType([
StructField('songId', StringType()),
StructField('title', StringType()),
StructField('release', StringType()),
StructField('artist_name', StringType()),
StructField('year', IntegerType())
])
userdata = spark.read.format('csv') \
.options(delimiter='\t', header=False, inferSchema=False) \
.schema(plays_df_schema) \
.load('10000.txt')
metadata = spark.read.format('csv') \
.options(delimiter=',', header=False, inferSchema=False) \
.schema(metadata_df_schema) \
.load('song_data.csv')
# Convert userID and songID to Integers
i = 0
def id_gen(value):
global i
i += 1
return i
id_func = F.udf(id_gen, IntegerType())
userId_change = userdata.select('userId').distinct().withColumn('new_userId', id_func("userId"))
i = 0
songId_change = userdata.select('songId').distinct().withColumn('new_songId', id_func("songId"))
unique_users = userId_change.count()
unique_songs = songId_change.count()
userdata = userdata.join(userId_change, 'userId').join(songId_change, 'songId')
# userdata.show(5)
# metadata.show(5)
rdd = userdata.rdd.map(tuple).map(lambda x: (int(x[3]), int(x[4]), int(x[2])))
model = ALS.train(rdd, 10, 5)
# print('Enter a user id to show personalized recommendations:')
# UserID = int(input())
UserID = 13
listened_songs = userdata.filter(userdata.new_userId == UserID) \
.join(metadata, 'songId') \
.select('new_songId', 'artist_name', 'title') \
# generate list of listened songs
listened_songs_list = []
for song in listened_songs.collect():
listened_songs_list.append(song['new_songId'])
print('Songs User 13 has listened to:')
listened_songs.select('artist_name', 'title').show()
# generate dataframe of unlistened songs
unlistened_songs = userdata.filter(~ userdata['new_songId'].isin(listened_songs_list)) \
.select('new_songId').withColumn('new_UserId', F.lit(UserID)).distinct()
unlistened_songs_rdd = unlistened_songs.rdd.map(tuple).map(lambda x: (int(x[1]), int(x[0])))
# feed unlistened songs into model
predicted_listens_rdd = model.predictAll(unlistened_songs_rdd)
predicted_listens = predicted_listens_rdd.toDF(['new_userId', 'new_songId', 'rating'])
print('Songs recommended by us:')
predicted_listens.join(userdata, 'new_songId') \
.join(metadata, 'songId') \
.select('artist_name', 'title', 'rating') \
.distinct() \
.orderBy('rating', ascending = False) \
.show(10)