diff --git a/master/master.go b/master/master.go index 60277b84a..a2ae848c0 100644 --- a/master/master.go +++ b/master/master.go @@ -95,7 +95,7 @@ type Master struct { // events fitTicker *time.Ticker - insertedChan chan bool // feedback inserted events + importedChan chan bool // feedback inserted events } // NewMaster creates a master node. @@ -138,7 +138,7 @@ func NewMaster(cfg *config.Config) *Master { WebService: new(restful.WebService), }, fitTicker: time.NewTicker(time.Duration(cfg.Recommend.FitPeriod) * time.Minute), - insertedChan: make(chan bool), + importedChan: make(chan bool), } } @@ -237,10 +237,10 @@ func (m *Master) RunPrivilegedTasksLoop() { err error ) go func() { - m.insertedChan <- true + m.importedChan <- true for { - if m.hasFeedbackInserted() { - m.insertedChan <- true + if m.checkDataImported() { + m.importedChan <- true } time.Sleep(time.Second) } @@ -248,7 +248,7 @@ func (m *Master) RunPrivilegedTasksLoop() { for { select { case <-m.fitTicker.C: - case <-m.insertedChan: + case <-m.importedChan: } // pre-lock privileged tasks tasksNames := []string{TaskLoadDataset, TaskFindItemNeighbors, TaskFindUserNeighbors, TaskFitRankingModel, TaskFitClickModel} @@ -328,13 +328,14 @@ func (m *Master) RunRagtagTasksLoop() { } } -func (m *Master) hasFeedbackInserted() bool { - numInserted, err := m.CacheClient.GetInt(cache.GlobalMeta, cache.NumInserted) +func (m *Master) checkDataImported() bool { + isDataImported, err := m.CacheClient.GetInt(cache.GlobalMeta, cache.DataImported) if err != nil { + base.Logger().Error("failed to read meta", zap.Error(err)) return false } - if numInserted > 0 { - err = m.CacheClient.SetInt(cache.GlobalMeta, cache.NumInserted, 0) + if isDataImported > 0 { + err = m.CacheClient.SetInt(cache.GlobalMeta, cache.DataImported, 0) if err != nil { base.Logger().Error("failed to write meta", zap.Error(err)) } @@ -342,3 +343,10 @@ func (m *Master) hasFeedbackInserted() bool { } return false } + +func (m *Master) notifyDataImported() { + err := m.CacheClient.IncrInt(cache.GlobalMeta, cache.DataImported) + if err != nil { + base.Logger().Error("failed to write meta", zap.Error(err)) + } +} diff --git a/master/rest.go b/master/rest.go index dfdc677d9..917df9b78 100644 --- a/master/rest.go +++ b/master/rest.go @@ -562,6 +562,7 @@ func (m *Master) importUsers(response http.ResponseWriter, file io.Reader, hasHe return } } + m.notifyDataImported() timeUsed := time.Since(timeStart) base.Logger().Info("complete import users", zap.Duration("time_used", timeUsed), @@ -684,6 +685,7 @@ func (m *Master) importItems(response http.ResponseWriter, file io.Reader, hasHe return } } + m.notifyDataImported() timeUsed := time.Since(timeStart) base.Logger().Info("complete import items", zap.Duration("time_used", timeUsed), @@ -844,6 +846,7 @@ func (m *Master) importFeedback(response http.ResponseWriter, file io.Reader, ha return } } + m.notifyDataImported() timeUsed := time.Since(timeStart) base.Logger().Info("complete import feedback", zap.Duration("time_used", timeUsed), diff --git a/server/rest.go b/server/rest.go index 621c227d0..ea0df498f 100644 --- a/server/rest.go +++ b/server/rest.go @@ -1353,10 +1353,6 @@ func (s *RestServer) InsertFeedbackToCache(feedback []data.Feedback) error { if err != nil { return errors.Trace(err) } - err = s.CacheClient.IncrInt(cache.GlobalMeta, cache.NumInserted) - if err != nil { - return errors.Trace(err) - } } return nil } diff --git a/storage/cache/database.go b/storage/cache/database.go index 51e6afe4c..9dc30844e 100644 --- a/storage/cache/database.go +++ b/storage/cache/database.go @@ -39,9 +39,7 @@ const ( // GlobalMeta is global meta information GlobalMeta = "global_meta" - NumInserted = "num_inserted" - LastUpdatePopularTime = "last_update_popular_time" - LastUpdateLatestTime = "last_update_latest_time" + DataImported = "data_imported" LastFitRankingModelTime = "last_fit_match_model_time" LastRankingModelVersion = "latest_match_model_version" )