Skip to content

Commit

Permalink
pull latest dataset after data import (#274)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaoz authored Sep 26, 2021
1 parent c81d777 commit 90f882c
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 17 deletions.
28 changes: 18 additions & 10 deletions master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type Master struct {

// events
fitTicker *time.Ticker
insertedChan chan bool // feedback inserted events
importedChan chan bool // feedback inserted events
}

// NewMaster creates a master node.
Expand Down Expand Up @@ -138,7 +138,7 @@ func NewMaster(cfg *config.Config) *Master {
WebService: new(restful.WebService),
},
fitTicker: time.NewTicker(time.Duration(cfg.Recommend.FitPeriod) * time.Minute),
insertedChan: make(chan bool),
importedChan: make(chan bool),
}
}

Expand Down Expand Up @@ -237,18 +237,18 @@ func (m *Master) RunPrivilegedTasksLoop() {
err error
)
go func() {
m.insertedChan <- true
m.importedChan <- true
for {
if m.hasFeedbackInserted() {
m.insertedChan <- true
if m.checkDataImported() {
m.importedChan <- true
}
time.Sleep(time.Second)
}
}()
for {
select {
case <-m.fitTicker.C:
case <-m.insertedChan:
case <-m.importedChan:
}
// pre-lock privileged tasks
tasksNames := []string{TaskLoadDataset, TaskFindItemNeighbors, TaskFindUserNeighbors, TaskFitRankingModel, TaskFitClickModel}
Expand Down Expand Up @@ -328,17 +328,25 @@ func (m *Master) RunRagtagTasksLoop() {
}
}

func (m *Master) hasFeedbackInserted() bool {
numInserted, err := m.CacheClient.GetInt(cache.GlobalMeta, cache.NumInserted)
func (m *Master) checkDataImported() bool {
isDataImported, err := m.CacheClient.GetInt(cache.GlobalMeta, cache.DataImported)
if err != nil {
base.Logger().Error("failed to read meta", zap.Error(err))
return false
}
if numInserted > 0 {
err = m.CacheClient.SetInt(cache.GlobalMeta, cache.NumInserted, 0)
if isDataImported > 0 {
err = m.CacheClient.SetInt(cache.GlobalMeta, cache.DataImported, 0)
if err != nil {
base.Logger().Error("failed to write meta", zap.Error(err))
}
return true
}
return false
}

func (m *Master) notifyDataImported() {
err := m.CacheClient.IncrInt(cache.GlobalMeta, cache.DataImported)
if err != nil {
base.Logger().Error("failed to write meta", zap.Error(err))
}
}
3 changes: 3 additions & 0 deletions master/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ func (m *Master) importUsers(response http.ResponseWriter, file io.Reader, hasHe
return
}
}
m.notifyDataImported()
timeUsed := time.Since(timeStart)
base.Logger().Info("complete import users",
zap.Duration("time_used", timeUsed),
Expand Down Expand Up @@ -684,6 +685,7 @@ func (m *Master) importItems(response http.ResponseWriter, file io.Reader, hasHe
return
}
}
m.notifyDataImported()
timeUsed := time.Since(timeStart)
base.Logger().Info("complete import items",
zap.Duration("time_used", timeUsed),
Expand Down Expand Up @@ -844,6 +846,7 @@ func (m *Master) importFeedback(response http.ResponseWriter, file io.Reader, ha
return
}
}
m.notifyDataImported()
timeUsed := time.Since(timeStart)
base.Logger().Info("complete import feedback",
zap.Duration("time_used", timeUsed),
Expand Down
4 changes: 0 additions & 4 deletions server/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,10 +1353,6 @@ func (s *RestServer) InsertFeedbackToCache(feedback []data.Feedback) error {
if err != nil {
return errors.Trace(err)
}
err = s.CacheClient.IncrInt(cache.GlobalMeta, cache.NumInserted)
if err != nil {
return errors.Trace(err)
}
}
return nil
}
4 changes: 1 addition & 3 deletions storage/cache/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ const (

// GlobalMeta is global meta information
GlobalMeta = "global_meta"
NumInserted = "num_inserted"
LastUpdatePopularTime = "last_update_popular_time"
LastUpdateLatestTime = "last_update_latest_time"
DataImported = "data_imported"
LastFitRankingModelTime = "last_fit_match_model_time"
LastRankingModelVersion = "latest_match_model_version"
)
Expand Down

0 comments on commit 90f882c

Please sign in to comment.