Skip to content

Commit

Permalink
recover from cache lost
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaoz committed Jun 5, 2021
1 parent cda9ce9 commit 164982c
Show file tree
Hide file tree
Showing 12 changed files with 147 additions and 49 deletions.
51 changes: 40 additions & 11 deletions master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ import (
"google.golang.org/grpc"
)

const (
ServerNode = "Server"
WorkerNode = "Worker"
)

// Master is the master node.
type Master struct {
protocol.UnimplementedMasterServer
server.RestServer
Expand All @@ -66,8 +62,13 @@ type Master struct {
rankingModelSearcher *ranking.ModelSearcher

localCache *LocalCache

// events
ticker *time.Ticker
insertedChan chan bool // feedback inserted events
}

// NewMaster creates a master node.
func NewMaster(cfg *config.Config) *Master {
rand.Seed(time.Now().UnixNano())
return &Master{
Expand All @@ -87,9 +88,12 @@ func NewMaster(cfg *config.Config) *Master {
EnableAuth: false,
WebService: new(restful.WebService),
},
ticker: time.NewTicker(time.Duration(cfg.Recommend.SearchPeriod) * time.Minute),
insertedChan: make(chan bool),
}
}

// Serve starts the master node.
func (m *Master) Serve() {

// load local cached model
Expand Down Expand Up @@ -164,14 +168,27 @@ func (m *Master) FitLoop() {
var bestName string
var bestModel ranking.Model
var bestScore ranking.Score
go func() {
m.insertedChan <- true
for {
if m.hasFeedbackInserted() {
m.insertedChan <- true
}
time.Sleep(time.Second)
}
}()
for {
select {
case <-m.ticker.C:
case <-m.insertedChan:
}
// download dataset
base.Logger().Info("load dataset for model fit", zap.Strings("feedback_types", m.GorseConfig.Database.PositiveFeedbackType))
dataSet, items, feedbacks, err := ranking.LoadDataFromDatabase(m.DataClient, m.GorseConfig.Database.PositiveFeedbackType,
m.GorseConfig.Database.ItemTTL, m.GorseConfig.Database.PositiveFeedbackTTL)
if err != nil {
base.Logger().Error("failed to load database", zap.Error(err))
goto sleep
continue
}
// save stats
if err = m.CacheClient.SetString(cache.GlobalMeta, cache.NumUsers, strconv.Itoa(dataSet.UserCount())); err != nil {
Expand All @@ -186,7 +203,7 @@ func (m *Master) FitLoop() {
// sleep if empty
if dataSet.Count() == 0 {
base.Logger().Warn("empty dataset", zap.Strings("feedback_type", m.GorseConfig.Database.PositiveFeedbackType))
goto sleep
continue
}
// check best model
bestName, bestModel, bestScore = m.rankingModelSearcher.GetBestModel()
Expand All @@ -205,7 +222,7 @@ func (m *Master) FitLoop() {
} else if dataSet.UserCount() == lastNumUsers && dataSet.ItemCount() == lastNumItems && dataSet.Count() == lastNumFeedback {
// sleep if nothing changed
m.rankingModelMutex.Unlock()
goto sleep
continue
}
m.rankingModelMutex.Unlock()
lastNumUsers, lastNumItems, lastNumFeedback = dataSet.UserCount(), dataSet.ItemCount(), dataSet.Count()
Expand All @@ -222,9 +239,6 @@ func (m *Master) FitLoop() {
m.popItem(items, feedbacks)
// collect latest items
m.latest(items)
// sleep
sleep:
time.Sleep(time.Duration(m.GorseConfig.Recommend.FitPeriod) * time.Minute)
}
}

Expand Down Expand Up @@ -261,3 +275,18 @@ func (m *Master) SearchLoop() {
time.Sleep(time.Duration(m.GorseConfig.Recommend.SearchPeriod) * time.Minute)
}
}

func (m *Master) hasFeedbackInserted() bool {
numInserted, err := m.CacheClient.GetInt(cache.GlobalMeta, cache.NumInserted)
if err != nil {
return false
}
if numInserted > 0 {
err = m.CacheClient.SetInt(cache.GlobalMeta, cache.NumInserted, 0)
if err != nil {
base.Logger().Error("failed to write meta", zap.Error(err))
}
return true
}
return false
}
24 changes: 19 additions & 5 deletions master/recommend.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2020 gorse Project Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package master

import (
Expand Down Expand Up @@ -50,7 +64,7 @@ func (m *Master) popItem(items []data.Item, feedback []data.Feedback) {
base.Logger().Error("failed to cache popular items", zap.Error(err))
}
}
if err := m.CacheClient.SetString(cache.GlobalMeta, cache.CollectPopularTime, base.Now()); err != nil {
if err := m.CacheClient.SetString(cache.GlobalMeta, cache.LastUpdatePopularTime, base.Now()); err != nil {
base.Logger().Error("failed to cache popular items", zap.Error(err))
}
}
Expand All @@ -77,7 +91,7 @@ func (m *Master) latest(items []data.Item) {
base.Logger().Error("failed to cache latest items", zap.Error(err))
}
}
if err = m.CacheClient.SetString(cache.GlobalMeta, cache.CollectLatestTime, base.Now()); err != nil {
if err = m.CacheClient.SetString(cache.GlobalMeta, cache.LastUpdateLatestTime, base.Now()); err != nil {
base.Logger().Error("failed to cache latest items time", zap.Error(err))
}
}
Expand Down Expand Up @@ -144,7 +158,7 @@ func (m *Master) similar(items []data.Item, dataset *ranking.DataSet, similarity
base.Logger().Error("failed to cache similar items", zap.Error(err))
}
close(completed)
if err := m.CacheClient.SetString(cache.GlobalMeta, cache.CollectSimilarTime, base.Now()); err != nil {
if err := m.CacheClient.SetString(cache.GlobalMeta, cache.LastUpdateNeighborTime, base.Now()); err != nil {
base.Logger().Error("failed to cache similar items", zap.Error(err))
}
}
Expand Down Expand Up @@ -203,10 +217,10 @@ func (m *Master) fitRankingModel(dataSet *ranking.DataSet, prModel ranking.Model
if err := m.DataClient.InsertMeasurement(data.Measurement{Name: "Precision@10", Value: score.Precision, Timestamp: time.Now()}); err != nil {
base.Logger().Error("failed to insert measurement", zap.Error(err))
}
if err := m.CacheClient.SetString(cache.GlobalMeta, cache.FitMatrixFactorizationTime, base.Now()); err != nil {
if err := m.CacheClient.SetString(cache.GlobalMeta, cache.LastFitRankingModelTime, base.Now()); err != nil {
base.Logger().Error("failed to write meta", zap.Error(err))
}
if err := m.CacheClient.SetString(cache.GlobalMeta, cache.MatrixFactorizationVersion, fmt.Sprintf("%x", m.rankingModelVersion)); err != nil {
if err := m.CacheClient.SetString(cache.GlobalMeta, cache.LastRankingModelVersion, fmt.Sprintf("%x", m.rankingModelVersion)); err != nil {
base.Logger().Error("failed to write meta", zap.Error(err))
}
// caching model
Expand Down
2 changes: 1 addition & 1 deletion master/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func TestServer_GetRecommends(t *testing.T) {
{"7", 93},
{"8", 92},
}
err := s.CacheClient.SetScores(cache.CollaborativeItems, "0", itemIds)
err := s.CacheClient.SetScores(cache.RecommendItems, "0", itemIds)
assert.Nil(t, err)
// insert feedback
feedback := []data.Feedback{
Expand Down
5 changes: 5 additions & 0 deletions master/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ type Node struct {
HttpPort int64
}

const (
ServerNode = "Server"
WorkerNode = "Worker"
)

// NewNode creates a node from Context and NodeInfo.
func NewNode(ctx context.Context, nodeInfo *protocol.NodeInfo) *Node {
node := new(Node)
Expand Down
8 changes: 6 additions & 2 deletions server/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (s *RestServer) getCollaborative(request *restful.Request, response *restfu
}
// Get user id
userId := request.PathParameter("user-id")
s.getList(cache.CollaborativeItems, userId, request, response)
s.getList(cache.RecommendItems, userId, request, response)
}

// Recommend items to users.
Expand All @@ -446,7 +446,7 @@ func (s *RestServer) Recommend(userId string, n int) ([]string, error) {
errChan := make(chan error, 1)
go func() {
var collaborativeFilteringItems []cache.ScoredItem
collaborativeFilteringItems, err = s.CacheClient.GetScores(cache.CollaborativeItems, userId, 0, s.GorseConfig.Database.CacheSize)
collaborativeFilteringItems, err = s.CacheClient.GetScores(cache.RecommendItems, userId, 0, s.GorseConfig.Database.CacheSize)
if err != nil {
itemsChan <- nil
errChan <- err
Expand Down Expand Up @@ -1120,6 +1120,10 @@ func (s *RestServer) InsertFeedbackToCache(feedback []data.Feedback) error {
if err != nil {
return err
}
err = s.CacheClient.IncrInt(cache.GlobalMeta, cache.NumInserted)
if err != nil {
return err
}
}
return nil
}
8 changes: 4 additions & 4 deletions server/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func TestServer_List(t *testing.T) {
Get string
}
operators := []ListOperator{
{cache.CollaborativeItems, "0", "/api/intermediate/recommend/0"},
{cache.RecommendItems, "0", "/api/intermediate/recommend/0"},
//{cache.SubscribeItems, "0", "/subscribe/0"},
{cache.LatestItems, "", "/api/latest/"},
{cache.LatestItems, "0", "/api/latest/0"},
Expand Down Expand Up @@ -491,7 +491,7 @@ func TestServer_GetRecommends(t *testing.T) {
s := newMockServer(t)
defer s.Close(t)
// insert recommendation
err := s.CacheClient.SetScores(cache.CollaborativeItems, "0",
err := s.CacheClient.SetScores(cache.RecommendItems, "0",
[]cache.ScoredItem{
{"1", 99},
{"2", 98},
Expand Down Expand Up @@ -557,7 +557,7 @@ func TestServer_GetRecommends_Fallback_Similar(t *testing.T) {
s := newMockServer(t)
defer s.Close(t)
// insert recommendation
err := s.CacheClient.SetScores(cache.CollaborativeItems, "0",
err := s.CacheClient.SetScores(cache.RecommendItems, "0",
[]cache.ScoredItem{{"1", 99}, {"2", 98}, {"3", 97}, {"4", 96}})
assert.Nil(t, err)
// insert feedback
Expand Down Expand Up @@ -622,7 +622,7 @@ func TestServer_GetRecommends_Fallback_NonPersonalized(t *testing.T) {
s := newMockServer(t)
defer s.Close(t)
// insert recommendation
err := s.CacheClient.SetScores(cache.CollaborativeItems, "0",
err := s.CacheClient.SetScores(cache.RecommendItems, "0",
[]cache.ScoredItem{{"1", 99}, {"2", 98}, {"3", 97}, {"4", 96}})
assert.Nil(t, err)
// insert latest
Expand Down
31 changes: 16 additions & 15 deletions storage/cache/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,26 @@ import (
)

const (
// IgnoreItems is these items that a user has read.
IgnoreItems = "ignore_items"
PopularItems = "popular_items"
LatestItems = "latest_items"
SimilarItems = "similar_items"
CollaborativeItems = "collaborative_items"
SubscribeItems = "subscribe_items"

GlobalMeta = "global_meta"
CollectPopularTime = "last_update_popular_time"
CollectLatestTime = "last_update_latest_time"
CollectSimilarTime = "last_update_similar_time"
FitMatrixFactorizationTime = "last_fit_match_model_time"
MatrixFactorizationVersion = "latest_match_model_version"

IgnoreItems = "ignore_items"
SimilarItems = "similar_items"
RecommendItems = "collaborative_items"
SubscribeItems = "subscribe_items"
PopularItems = "popular_items"
LatestItems = "latest_items"
LastActiveTime = "last_active_time"
LastUpdateRecommendTime = "last_update_recommend_time"

// GlobalMeta is global meta information
GlobalMeta = "global_meta"
NumInserted = "num_inserted"
NumUsers = "num_users"
NumItems = "num_items"
NumPositiveFeedback = "num_pos_feedback"
LastUpdatePopularTime = "last_update_popular_time"
LastUpdateLatestTime = "last_update_latest_time"
LastUpdateNeighborTime = "last_update_similar_time"
LastFitRankingModelTime = "last_fit_match_model_time"
LastRankingModelVersion = "latest_match_model_version"
)

var ErrObjectNotExist = fmt.Errorf("object not exists")
Expand Down Expand Up @@ -90,6 +90,7 @@ type Database interface {
SetTime(prefix, name string, val time.Time) error
GetInt(prefix, name string) (int, error)
SetInt(prefix, name string, val int) error
IncrInt(prefix, name string) error
}

const redisPrefix = "redis://"
Expand Down
6 changes: 6 additions & 0 deletions storage/cache/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ func testMeta(t *testing.T, db Database) {
valInt, err := db.GetInt("meta", "1")
assert.Nil(t, err)
assert.Equal(t, 2, valInt)
// increase meta int
err = db.IncrInt("meta", "1")
assert.Nil(t, err)
valInt, err = db.GetInt("meta", "1")
assert.Nil(t, err)
assert.Equal(t, 3, valInt)
// set meta time
err = db.SetTime("meta", "1", time.Date(1996, 4, 8, 0, 0, 0, 0, time.UTC))
assert.Nil(t, err)
Expand Down
5 changes: 5 additions & 0 deletions storage/cache/no_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,8 @@ func (NoDatabase) GetTime(prefix, name string) (time.Time, error) {
func (NoDatabase) SetTime(prefix, name string, val time.Time) error {
return ErrNoDatabase
}

// IncrInt method of NoDatabase returns ErrNoDatabase.
func (NoDatabase) IncrInt(prefix, name string) error {
return ErrNoDatabase
}
10 changes: 10 additions & 0 deletions storage/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ func (r *Redis) SetInt(prefix, name string, val int) error {
return r.SetString(prefix, name, strconv.Itoa(val))
}

// IncrInt increase a integer in Redis.
func (r *Redis) IncrInt(prefix, name string) error {
var ctx = context.Background()
key := prefix + "/" + name
if err := r.client.Incr(ctx, key).Err(); err != nil {
return err
}
return nil
}

// GetTime returns a time from Redis.
func (r *Redis) GetTime(prefix, name string) (time.Time, error) {
val, err := r.GetString(prefix, name)
Expand Down
26 changes: 16 additions & 10 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (w *Worker) Recommend(m ranking.Model, users []string) {
}
// save result
elems, scores := recItems.PopAll()
if err = w.cacheClient.SetScores(cache.CollaborativeItems, userId, cache.CreateScoredItems(elems, scores)); err != nil {
if err = w.cacheClient.SetScores(cache.RecommendItems, userId, cache.CreateScoredItems(elems, scores)); err != nil {
base.Logger().Error("failed to cache recommendation", zap.Error(err))
return err
}
Expand All @@ -412,24 +412,30 @@ func (w *Worker) Recommend(m ranking.Model, users []string) {
}

// checkRecommendCacheTimeout checks if recommend cache stale.
// 1. if active time > recommend time, stale.
// 2. if recommend time + timeout < now, stale.
// 1. if cache is empty, stale.
// 2. if active time > recommend time, stale.
// 3. if recommend time + timeout < now, stale.
func (w *Worker) checkRecommendCacheTimeout(userId string) bool {
var activeTime, recommendTime time.Time
var err error
// check cache
items, err := w.cacheClient.GetScores(cache.RecommendItems, userId, 0, -1)
if err != nil {
base.Logger().Error("failed to read meta", zap.Error(err))
return true
} else if len(items) == 0 {
return true
}
// read active time
activeTime, err = w.cacheClient.GetTime(cache.LastActiveTime, userId)
if err != nil && err != cache.ErrObjectNotExist {
if err != nil {
base.Logger().Error("failed to read meta", zap.Error(err))
return true
}
// read recommend time
recommendTime, err = w.cacheClient.GetTime(cache.LastUpdateRecommendTime, userId)
if err != nil {
if err != cache.ErrObjectNotExist {
base.Logger().Error("failed to read meta", zap.Error(err))
} else {
return true
}
base.Logger().Error("failed to read meta", zap.Error(err))
return true
}
// check time
if activeTime.Unix() < recommendTime.Unix() {
Expand Down
Loading

0 comments on commit 164982c

Please sign in to comment.