diff --git a/master/master.go b/master/master.go index 855c365c0..689a3a733 100644 --- a/master/master.go +++ b/master/master.go @@ -38,11 +38,7 @@ import ( "google.golang.org/grpc" ) -const ( - ServerNode = "Server" - WorkerNode = "Worker" -) - +// Master is the master node. type Master struct { protocol.UnimplementedMasterServer server.RestServer @@ -66,8 +62,13 @@ type Master struct { rankingModelSearcher *ranking.ModelSearcher localCache *LocalCache + + // events + ticker *time.Ticker + insertedChan chan bool // feedback inserted events } +// NewMaster creates a master node. func NewMaster(cfg *config.Config) *Master { rand.Seed(time.Now().UnixNano()) return &Master{ @@ -87,9 +88,12 @@ func NewMaster(cfg *config.Config) *Master { EnableAuth: false, WebService: new(restful.WebService), }, + ticker: time.NewTicker(time.Duration(cfg.Recommend.SearchPeriod) * time.Minute), + insertedChan: make(chan bool), } } +// Serve starts the master node. func (m *Master) Serve() { // load local cached model @@ -164,14 +168,27 @@ func (m *Master) FitLoop() { var bestName string var bestModel ranking.Model var bestScore ranking.Score + go func() { + m.insertedChan <- true + for { + if m.hasFeedbackInserted() { + m.insertedChan <- true + } + time.Sleep(time.Second) + } + }() for { + select { + case <-m.ticker.C: + case <-m.insertedChan: + } // download dataset base.Logger().Info("load dataset for model fit", zap.Strings("feedback_types", m.GorseConfig.Database.PositiveFeedbackType)) dataSet, items, feedbacks, err := ranking.LoadDataFromDatabase(m.DataClient, m.GorseConfig.Database.PositiveFeedbackType, m.GorseConfig.Database.ItemTTL, m.GorseConfig.Database.PositiveFeedbackTTL) if err != nil { base.Logger().Error("failed to load database", zap.Error(err)) - goto sleep + continue } // save stats if err = m.CacheClient.SetString(cache.GlobalMeta, cache.NumUsers, strconv.Itoa(dataSet.UserCount())); err != nil { @@ -186,7 +203,7 @@ func (m *Master) FitLoop() { // sleep if empty if dataSet.Count() == 0 { base.Logger().Warn("empty dataset", zap.Strings("feedback_type", m.GorseConfig.Database.PositiveFeedbackType)) - goto sleep + continue } // check best model bestName, bestModel, bestScore = m.rankingModelSearcher.GetBestModel() @@ -205,7 +222,7 @@ func (m *Master) FitLoop() { } else if dataSet.UserCount() == lastNumUsers && dataSet.ItemCount() == lastNumItems && dataSet.Count() == lastNumFeedback { // sleep if nothing changed m.rankingModelMutex.Unlock() - goto sleep + continue } m.rankingModelMutex.Unlock() lastNumUsers, lastNumItems, lastNumFeedback = dataSet.UserCount(), dataSet.ItemCount(), dataSet.Count() @@ -222,9 +239,6 @@ func (m *Master) FitLoop() { m.popItem(items, feedbacks) // collect latest items m.latest(items) - // sleep - sleep: - time.Sleep(time.Duration(m.GorseConfig.Recommend.FitPeriod) * time.Minute) } } @@ -261,3 +275,18 @@ func (m *Master) SearchLoop() { time.Sleep(time.Duration(m.GorseConfig.Recommend.SearchPeriod) * time.Minute) } } + +func (m *Master) hasFeedbackInserted() bool { + numInserted, err := m.CacheClient.GetInt(cache.GlobalMeta, cache.NumInserted) + if err != nil { + return false + } + if numInserted > 0 { + err = m.CacheClient.SetInt(cache.GlobalMeta, cache.NumInserted, 0) + if err != nil { + base.Logger().Error("failed to write meta", zap.Error(err)) + } + return true + } + return false +} diff --git a/master/recommend.go b/master/recommend.go index 28b7b3dcf..c8eaf16a7 100644 --- a/master/recommend.go +++ b/master/recommend.go @@ -1,3 +1,17 @@ +// Copyright 2020 gorse Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package master import ( @@ -50,7 +64,7 @@ func (m *Master) popItem(items []data.Item, feedback []data.Feedback) { base.Logger().Error("failed to cache popular items", zap.Error(err)) } } - if err := m.CacheClient.SetString(cache.GlobalMeta, cache.CollectPopularTime, base.Now()); err != nil { + if err := m.CacheClient.SetString(cache.GlobalMeta, cache.LastUpdatePopularTime, base.Now()); err != nil { base.Logger().Error("failed to cache popular items", zap.Error(err)) } } @@ -77,7 +91,7 @@ func (m *Master) latest(items []data.Item) { base.Logger().Error("failed to cache latest items", zap.Error(err)) } } - if err = m.CacheClient.SetString(cache.GlobalMeta, cache.CollectLatestTime, base.Now()); err != nil { + if err = m.CacheClient.SetString(cache.GlobalMeta, cache.LastUpdateLatestTime, base.Now()); err != nil { base.Logger().Error("failed to cache latest items time", zap.Error(err)) } } @@ -144,7 +158,7 @@ func (m *Master) similar(items []data.Item, dataset *ranking.DataSet, similarity base.Logger().Error("failed to cache similar items", zap.Error(err)) } close(completed) - if err := m.CacheClient.SetString(cache.GlobalMeta, cache.CollectSimilarTime, base.Now()); err != nil { + if err := m.CacheClient.SetString(cache.GlobalMeta, cache.LastUpdateNeighborTime, base.Now()); err != nil { base.Logger().Error("failed to cache similar items", zap.Error(err)) } } @@ -203,10 +217,10 @@ func (m *Master) fitRankingModel(dataSet *ranking.DataSet, prModel ranking.Model if err := m.DataClient.InsertMeasurement(data.Measurement{Name: "Precision@10", Value: score.Precision, Timestamp: time.Now()}); err != nil { base.Logger().Error("failed to insert measurement", zap.Error(err)) } - if err := m.CacheClient.SetString(cache.GlobalMeta, cache.FitMatrixFactorizationTime, base.Now()); err != nil { + if err := m.CacheClient.SetString(cache.GlobalMeta, cache.LastFitRankingModelTime, base.Now()); err != nil { base.Logger().Error("failed to write meta", zap.Error(err)) } - if err := m.CacheClient.SetString(cache.GlobalMeta, cache.MatrixFactorizationVersion, fmt.Sprintf("%x", m.rankingModelVersion)); err != nil { + if err := m.CacheClient.SetString(cache.GlobalMeta, cache.LastRankingModelVersion, fmt.Sprintf("%x", m.rankingModelVersion)); err != nil { base.Logger().Error("failed to write meta", zap.Error(err)) } // caching model diff --git a/master/rest_test.go b/master/rest_test.go index f2e700c91..d3d14fee6 100644 --- a/master/rest_test.go +++ b/master/rest_test.go @@ -426,7 +426,7 @@ func TestServer_GetRecommends(t *testing.T) { {"7", 93}, {"8", 92}, } - err := s.CacheClient.SetScores(cache.CollaborativeItems, "0", itemIds) + err := s.CacheClient.SetScores(cache.RecommendItems, "0", itemIds) assert.Nil(t, err) // insert feedback feedback := []data.Feedback{ diff --git a/master/rpc.go b/master/rpc.go index c899d5049..4b45eedbe 100644 --- a/master/rpc.go +++ b/master/rpc.go @@ -36,6 +36,11 @@ type Node struct { HttpPort int64 } +const ( + ServerNode = "Server" + WorkerNode = "Worker" +) + // NewNode creates a node from Context and NodeInfo. func NewNode(ctx context.Context, nodeInfo *protocol.NodeInfo) *Node { node := new(Node) diff --git a/server/rest.go b/server/rest.go index 3e276a827..832c98d31 100644 --- a/server/rest.go +++ b/server/rest.go @@ -429,7 +429,7 @@ func (s *RestServer) getCollaborative(request *restful.Request, response *restfu } // Get user id userId := request.PathParameter("user-id") - s.getList(cache.CollaborativeItems, userId, request, response) + s.getList(cache.RecommendItems, userId, request, response) } // Recommend items to users. @@ -446,7 +446,7 @@ func (s *RestServer) Recommend(userId string, n int) ([]string, error) { errChan := make(chan error, 1) go func() { var collaborativeFilteringItems []cache.ScoredItem - collaborativeFilteringItems, err = s.CacheClient.GetScores(cache.CollaborativeItems, userId, 0, s.GorseConfig.Database.CacheSize) + collaborativeFilteringItems, err = s.CacheClient.GetScores(cache.RecommendItems, userId, 0, s.GorseConfig.Database.CacheSize) if err != nil { itemsChan <- nil errChan <- err @@ -1120,6 +1120,10 @@ func (s *RestServer) InsertFeedbackToCache(feedback []data.Feedback) error { if err != nil { return err } + err = s.CacheClient.IncrInt(cache.GlobalMeta, cache.NumInserted) + if err != nil { + return err + } } return nil } diff --git a/server/rest_test.go b/server/rest_test.go index 6d7733c60..5f02eb53a 100644 --- a/server/rest_test.go +++ b/server/rest_test.go @@ -333,7 +333,7 @@ func TestServer_List(t *testing.T) { Get string } operators := []ListOperator{ - {cache.CollaborativeItems, "0", "/api/intermediate/recommend/0"}, + {cache.RecommendItems, "0", "/api/intermediate/recommend/0"}, //{cache.SubscribeItems, "0", "/subscribe/0"}, {cache.LatestItems, "", "/api/latest/"}, {cache.LatestItems, "0", "/api/latest/0"}, @@ -491,7 +491,7 @@ func TestServer_GetRecommends(t *testing.T) { s := newMockServer(t) defer s.Close(t) // insert recommendation - err := s.CacheClient.SetScores(cache.CollaborativeItems, "0", + err := s.CacheClient.SetScores(cache.RecommendItems, "0", []cache.ScoredItem{ {"1", 99}, {"2", 98}, @@ -557,7 +557,7 @@ func TestServer_GetRecommends_Fallback_Similar(t *testing.T) { s := newMockServer(t) defer s.Close(t) // insert recommendation - err := s.CacheClient.SetScores(cache.CollaborativeItems, "0", + err := s.CacheClient.SetScores(cache.RecommendItems, "0", []cache.ScoredItem{{"1", 99}, {"2", 98}, {"3", 97}, {"4", 96}}) assert.Nil(t, err) // insert feedback @@ -622,7 +622,7 @@ func TestServer_GetRecommends_Fallback_NonPersonalized(t *testing.T) { s := newMockServer(t) defer s.Close(t) // insert recommendation - err := s.CacheClient.SetScores(cache.CollaborativeItems, "0", + err := s.CacheClient.SetScores(cache.RecommendItems, "0", []cache.ScoredItem{{"1", 99}, {"2", 98}, {"3", 97}, {"4", 96}}) assert.Nil(t, err) // insert latest diff --git a/storage/cache/database.go b/storage/cache/database.go index d370d2f4c..3826afd38 100644 --- a/storage/cache/database.go +++ b/storage/cache/database.go @@ -23,26 +23,26 @@ import ( ) const ( - // IgnoreItems is these items that a user has read. - IgnoreItems = "ignore_items" - PopularItems = "popular_items" - LatestItems = "latest_items" - SimilarItems = "similar_items" - CollaborativeItems = "collaborative_items" - SubscribeItems = "subscribe_items" - - GlobalMeta = "global_meta" - CollectPopularTime = "last_update_popular_time" - CollectLatestTime = "last_update_latest_time" - CollectSimilarTime = "last_update_similar_time" - FitMatrixFactorizationTime = "last_fit_match_model_time" - MatrixFactorizationVersion = "latest_match_model_version" - + IgnoreItems = "ignore_items" + SimilarItems = "similar_items" + RecommendItems = "collaborative_items" + SubscribeItems = "subscribe_items" + PopularItems = "popular_items" + LatestItems = "latest_items" LastActiveTime = "last_active_time" LastUpdateRecommendTime = "last_update_recommend_time" + + // GlobalMeta is global meta information + GlobalMeta = "global_meta" + NumInserted = "num_inserted" NumUsers = "num_users" NumItems = "num_items" NumPositiveFeedback = "num_pos_feedback" + LastUpdatePopularTime = "last_update_popular_time" + LastUpdateLatestTime = "last_update_latest_time" + LastUpdateNeighborTime = "last_update_similar_time" + LastFitRankingModelTime = "last_fit_match_model_time" + LastRankingModelVersion = "latest_match_model_version" ) var ErrObjectNotExist = fmt.Errorf("object not exists") @@ -90,6 +90,7 @@ type Database interface { SetTime(prefix, name string, val time.Time) error GetInt(prefix, name string) (int, error) SetInt(prefix, name string, val int) error + IncrInt(prefix, name string) error } const redisPrefix = "redis://" diff --git a/storage/cache/database_test.go b/storage/cache/database_test.go index 7735d03a1..0662a2a96 100644 --- a/storage/cache/database_test.go +++ b/storage/cache/database_test.go @@ -39,6 +39,12 @@ func testMeta(t *testing.T, db Database) { valInt, err := db.GetInt("meta", "1") assert.Nil(t, err) assert.Equal(t, 2, valInt) + // increase meta int + err = db.IncrInt("meta", "1") + assert.Nil(t, err) + valInt, err = db.GetInt("meta", "1") + assert.Nil(t, err) + assert.Equal(t, 3, valInt) // set meta time err = db.SetTime("meta", "1", time.Date(1996, 4, 8, 0, 0, 0, 0, time.UTC)) assert.Nil(t, err) diff --git a/storage/cache/no_database.go b/storage/cache/no_database.go index 6b55a5032..5977eed52 100644 --- a/storage/cache/no_database.go +++ b/storage/cache/no_database.go @@ -78,3 +78,8 @@ func (NoDatabase) GetTime(prefix, name string) (time.Time, error) { func (NoDatabase) SetTime(prefix, name string, val time.Time) error { return ErrNoDatabase } + +// IncrInt method of NoDatabase returns ErrNoDatabase. +func (NoDatabase) IncrInt(prefix, name string) error { + return ErrNoDatabase +} diff --git a/storage/cache/redis.go b/storage/cache/redis.go index 8c67ac66e..3db03482d 100644 --- a/storage/cache/redis.go +++ b/storage/cache/redis.go @@ -152,6 +152,16 @@ func (r *Redis) SetInt(prefix, name string, val int) error { return r.SetString(prefix, name, strconv.Itoa(val)) } +// IncrInt increase a integer in Redis. +func (r *Redis) IncrInt(prefix, name string) error { + var ctx = context.Background() + key := prefix + "/" + name + if err := r.client.Incr(ctx, key).Err(); err != nil { + return err + } + return nil +} + // GetTime returns a time from Redis. func (r *Redis) GetTime(prefix, name string) (time.Time, error) { val, err := r.GetString(prefix, name) diff --git a/worker/worker.go b/worker/worker.go index 34524d49f..4c16a850b 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -391,7 +391,7 @@ func (w *Worker) Recommend(m ranking.Model, users []string) { } // save result elems, scores := recItems.PopAll() - if err = w.cacheClient.SetScores(cache.CollaborativeItems, userId, cache.CreateScoredItems(elems, scores)); err != nil { + if err = w.cacheClient.SetScores(cache.RecommendItems, userId, cache.CreateScoredItems(elems, scores)); err != nil { base.Logger().Error("failed to cache recommendation", zap.Error(err)) return err } @@ -412,24 +412,30 @@ func (w *Worker) Recommend(m ranking.Model, users []string) { } // checkRecommendCacheTimeout checks if recommend cache stale. -// 1. if active time > recommend time, stale. -// 2. if recommend time + timeout < now, stale. +// 1. if cache is empty, stale. +// 2. if active time > recommend time, stale. +// 3. if recommend time + timeout < now, stale. func (w *Worker) checkRecommendCacheTimeout(userId string) bool { var activeTime, recommendTime time.Time - var err error + // check cache + items, err := w.cacheClient.GetScores(cache.RecommendItems, userId, 0, -1) + if err != nil { + base.Logger().Error("failed to read meta", zap.Error(err)) + return true + } else if len(items) == 0 { + return true + } // read active time activeTime, err = w.cacheClient.GetTime(cache.LastActiveTime, userId) - if err != nil && err != cache.ErrObjectNotExist { + if err != nil { base.Logger().Error("failed to read meta", zap.Error(err)) + return true } // read recommend time recommendTime, err = w.cacheClient.GetTime(cache.LastUpdateRecommendTime, userId) if err != nil { - if err != cache.ErrObjectNotExist { - base.Logger().Error("failed to read meta", zap.Error(err)) - } else { - return true - } + base.Logger().Error("failed to read meta", zap.Error(err)) + return true } // check time if activeTime.Unix() < recommendTime.Unix() { diff --git a/worker/worker_test.go b/worker/worker_test.go index 2d3029a68..3bff4698e 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -49,6 +49,24 @@ func TestSplit(t *testing.T) { assert.Error(t, err) } +func TestCheckRecommendCacheTimeout(t *testing.T) { + // create mock worker + w := newMockWorker(t) + defer w.Close(t) + // insert cache + err := w.cacheClient.SetScores(cache.RecommendItems, "0", []cache.ScoredItem{{"0", 0}}) + assert.Nil(t, err) + assert.True(t, w.checkRecommendCacheTimeout("0")) + err = w.cacheClient.SetTime(cache.LastActiveTime, "0", time.Now().Add(-time.Hour)) + assert.True(t, w.checkRecommendCacheTimeout("0")) + err = w.cacheClient.SetTime(cache.LastUpdateRecommendTime, "0", time.Now().Add(-time.Hour*100)) + assert.True(t, w.checkRecommendCacheTimeout("0")) + err = w.cacheClient.SetTime(cache.LastUpdateRecommendTime, "0", time.Now().Add(time.Hour*100)) + assert.False(t, w.checkRecommendCacheTimeout("0")) + err = w.cacheClient.ClearList(cache.RecommendItems, "0") + assert.True(t, w.checkRecommendCacheTimeout("0")) +} + type mockMatrixFactorizationForRecommend struct { ranking.BaseMatrixFactorization } @@ -131,7 +149,7 @@ func TestRecommendMatrixFactorization(t *testing.T) { m := newMockMatrixFactorizationForRecommend(1, 10) w.Recommend(m, []string{"0"}) - recommends, err := w.cacheClient.GetScores(cache.CollaborativeItems, "0", 0, -1) + recommends, err := w.cacheClient.GetScores(cache.RecommendItems, "0", 0, -1) assert.Nil(t, err) assert.Equal(t, []cache.ScoredItem{ {"3", 3},