-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathengine.py
133 lines (111 loc) · 6.15 KB
/
engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import os
from pyspark.mllib.recommendation import ALS
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_counts_and_averages(ID_and_ratings_tuple):
"""Given a tuple (movieID, ratings_iterable)
returns (movieID, (ratings_count, ratings_avg))
"""
nratings = len(ID_and_ratings_tuple[1])
return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1])) / nratings)
class RecommendationEngine:
"""A movie recommendation engine
"""
def __count_and_average_ratings(self):
"""Updates the movies ratings counts from
the current data self.ratings_RDD
"""
logger.info("Counting movie ratings...")
movie_ID_with_ratings_RDD = self.ratings_RDD.map(lambda x: (x[1], x[2])).groupByKey()
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
self.movies_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))
def __train_model(self):
"""Train the ALS model with the current dataset
"""
logger.info("Training the ALS model...")
self.model = ALS.train(self.ratings_RDD, self.rank,
iterations=self.iterations, lambda_=self.regularization_parameter)
logger.info("ALS model built!")
def __predict_ratings(self, user_and_movie_RDD):
"""Gets predictions for a given (userID, movieID) formatted RDD
Returns: an RDD with format (movieTitle, movieRating, numRatings)
"""
predicted_RDD = self.model.predictAll(user_and_movie_RDD)
predicted_rating_RDD = predicted_RDD.map(lambda x: (x.product, x.rating))
predicted_rating_title_and_count_RDD = \
predicted_rating_RDD.join(self.movies_titles_RDD).join(self.movies_rating_counts_RDD)
predicted_rating_title_and_count_RDD = \
predicted_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))
return predicted_rating_title_and_count_RDD
def add_ratings(self, ratings):
"""Add additional movie ratings in the format (user_id, movie_id, rating)
"""
# Convert ratings to an RDD
new_ratings_RDD = self.sc.parallelize(ratings)
# Add new ratings to the existing ones
self.ratings_RDD = self.ratings_RDD.union(new_ratings_RDD)
# Re-compute movie ratings count
self.__count_and_average_ratings()
# Re-train the ALS model with the new ratings
self.__train_model()
return ratings
def get_most_rated(self, movies_count):
"""Recommends highest counted movies
"""
logger.info("Count the total ratings for each movie...")
movies = self.movies_titles_RDD.join(self.movies_rating_counts_RDD).map(lambda x: (x[1][0], x[1][1])).takeOrdered(movies_count,
key=lambda
x: -x[1])
return movies
def get_highest_rating(self, movies_count):
movie_ID_with_ratings_RDD = self.ratings_RDD.map(lambda x: (x[1], x[2])).groupByKey()
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movies = self.movies_titles_RDD.join(movie_ID_with_avg_ratings_RDD).filter(lambda r: r[1][1][0] >= 25).takeOrdered(movies_count,key=lambda x: -x[1][1][1])
return movies
def get_ratings_for_movie_ids(self, user_id, movie_ids):
"""Given a user_id and a list of movie_ids, predict ratings for them
"""
requested_movies_RDD = self.sc.parallelize(movie_ids).map(lambda x: (user_id, x))
# Get predicted ratings
ratings = self.__predict_ratings(requested_movies_RDD).collect()
return ratings
def get_top_ratings(self, user_id, movies_count):
"""Recommends up to movies_count top unrated movies to user_id
"""
# Get pairs of (userID, movieID) for user_id unrated movies
user_unrated_movies_RDD = self.ratings_RDD.filter(lambda rating: not rating[0] == user_id) \
.map(lambda x: (user_id, x[1])).distinct()
# Get predicted ratings
ratings = self.__predict_ratings(user_unrated_movies_RDD).filter(lambda r: r[2] >= 25).takeOrdered(movies_count,
key=lambda
x: -x[1])
return ratings
def __init__(self, sc, dataset_path):
"""Init the recommendation engine given a Spark context and a dataset path
"""
logger.info("Starting up the Recommendation Engine: ")
self.sc = sc
# Load ratings data for later use
logger.info("Loading Ratings data...")
ratings_file_path = os.path.join(dataset_path, 'ratings1.csv')
ratings_raw_RDD = self.sc.textFile(ratings_file_path)
ratings_raw_data_header = ratings_raw_RDD.take(1)[0]
self.ratings_RDD = ratings_raw_RDD.filter(lambda line: line != ratings_raw_data_header) \
.map(lambda line: line.split(",")).map(
lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))).cache()
# Load movies data for later use
logger.info("Loading Movies data...")
movies_file_path = os.path.join(dataset_path, 'movies1.csv')
movies_raw_RDD = self.sc.textFile(movies_file_path)
movies_raw_data_header = movies_raw_RDD.take(1)[0]
self.movies_RDD = movies_raw_RDD.filter(lambda line: line != movies_raw_data_header) \
.map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]), tokens[1], tokens[2])).cache()
self.movies_titles_RDD = self.movies_RDD.map(lambda x: (int(x[0]), x[1])).cache()
# Pre-calculate movies ratings counts
self.__count_and_average_ratings()
# Train the model
self.rank = 8
self.iterations = 10
self.regularization_parameter = 0.1
self.__train_model()