Skip to content

Commit

Permalink
support multiple positive feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaoz committed Jun 17, 2021
1 parent 23bc2db commit e4506a1
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 93 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type DatabaseConfig struct {
AutoInsertItem bool `toml:"auto_insert_item"` // insert new items while inserting feedback
CacheSize int `toml:"cache_size"` // cache size for intermediate recommendation
PositiveFeedbackType []string `toml:"positive_feedback_types"` // positive feedback type
ClickFeedbackTypes []string `toml:"click_feedback_types"` // feedback types for click event
ReadFeedbackType string `toml:"read_feedback_type"` // feedback type for read event
PositiveFeedbackTTL uint `toml:"positive_feedback_ttl"`
ItemTTL uint `toml:"item_ttl"`
}
Expand Down
20 changes: 13 additions & 7 deletions config/config.toml.template
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@
cache_store = "redis://localhost:6379"
# database for persist data (support MySQL/MongoDB)
data_store = "mysql://root@tcp(localhost:3306)/gorse?parseTime=true"
# cache size for intermediate recommendation
cache_size = 200
# insert new users while inserting feedback
auto_insert_user = true
# insert new items while inserting feedback
auto_insert_item = false
# types of positive feedback
positive_feedback_types = ["star"]
# feedback type for positive event
positive_feedback_types = ["star","like"]
# feedback type for click event
click_feedback_types = ["like"]
# feedback type for read event
read_feedback_type = "read"
# positive feedback time-to-live (days), 0 means disabled.
positive_feedback_ttl = 0
positive_feedback_ttl = 3650
# item time-to-live (days), 0 means disabled.
item_ttl = 0
item_ttl = 3650

# This section declares settings for the master node.
[master]
Expand All @@ -22,7 +28,7 @@ host = "127.0.0.1" # master host
http_port = 8088 # HTTP API port
http_host = "127.0.0.1" # HTTP API host
search_jobs = 1 # number of jobs for model search
fit_jobs = 1 # number of jobs for model fitting
fit_jobs = 2 # number of jobs for model fitting
meta_timeout = 10 # cluster meta timeout (second)

# This section declares settings for the server node.
Expand All @@ -32,10 +38,10 @@ api_key = "" # secret key for RESTful APIs (SSL required)

# This section declares settings for recommendation.
[recommend]
popular_window = 365 # timw window of popular items (days)
popular_window = 365 # time window of popular items (days)
fit_period = 10 # time period for model fitting (minutes)
search_period = 60 # time period for model searching (minutes)
max_recommend_period = 1 # time period for inactive user recommendation (days)
search_epoch = 100 # number of epochs for model searching
search_trials = 10 # number of trials for model searching
max_recommend_period = 1 # time period for inactive user recommendation (days)
fallback_recommend = "latest" # fallback method for recommendation (popular/latest)
32 changes: 18 additions & 14 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,41 @@ import (
)

func TestLoadConfig(t *testing.T) {
config, _, err := LoadConfig("../misc/config_test/config.toml")
config, _, err := LoadConfig("config.toml.template")
assert.Nil(t, err)

// database configuration
assert.Equal(t, "redis://localhost:6379", config.Database.CacheStore)
assert.Equal(t, "mysql://root@tcp(localhost:3306)/gorse?parseTime=true", config.Database.DataStore)
assert.Equal(t, true, config.Database.AutoInsertUser)
assert.Equal(t, false, config.Database.AutoInsertItem)
assert.Equal(t, []string{"star", "fork"}, config.Database.PositiveFeedbackType)
assert.Equal(t, uint(998), config.Database.PositiveFeedbackTTL)
assert.Equal(t, uint(999), config.Database.ItemTTL)
assert.Equal(t, 200, config.Database.CacheSize)
assert.Equal(t, []string{"star", "like"}, config.Database.PositiveFeedbackType)
assert.Equal(t, []string{"like"}, config.Database.ClickFeedbackTypes)
assert.Equal(t, "read", config.Database.ReadFeedbackType)
assert.Equal(t, uint(3650), config.Database.PositiveFeedbackTTL)
assert.Equal(t, uint(3650), config.Database.ItemTTL)

// master configuration
assert.Equal(t, 8086, config.Master.Port)
assert.Equal(t, "127.0.0.1", config.Master.Host)
assert.Equal(t, 8088, config.Master.HttpPort)
assert.Equal(t, "127.0.0.1", config.Master.HttpHost)
assert.Equal(t, 3, config.Master.SearchJobs)
assert.Equal(t, 4, config.Master.FitJobs)
assert.Equal(t, 30, config.Master.MetaTimeout)
assert.Equal(t, 1, config.Master.SearchJobs)
assert.Equal(t, 2, config.Master.FitJobs)
assert.Equal(t, 10, config.Master.MetaTimeout)

// server configuration
assert.Equal(t, 128, config.Server.DefaultN)
assert.Equal(t, "p@ssword", config.Server.APIKey)
assert.Equal(t, 10, config.Server.DefaultN)
assert.Equal(t, "", config.Server.APIKey)

// recommend configuration
assert.Equal(t, 12, config.Recommend.PopularWindow)
assert.Equal(t, 66, config.Recommend.FitPeriod)
assert.Equal(t, 88, config.Recommend.SearchPeriod)
assert.Equal(t, 102, config.Recommend.SearchEpoch)
assert.Equal(t, 9, config.Recommend.SearchTrials)
assert.Equal(t, 365, config.Recommend.PopularWindow)
assert.Equal(t, 10, config.Recommend.FitPeriod)
assert.Equal(t, 60, config.Recommend.SearchPeriod)
assert.Equal(t, 100, config.Recommend.SearchEpoch)
assert.Equal(t, 10, config.Recommend.SearchTrials)
assert.Equal(t, 1, config.Recommend.MaxRecommendPeriod)
assert.Equal(t, "latest", config.Recommend.FallbackRecommend)
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/go-redis/redis/v8 v8.8.0
github.com/go-sql-driver/mysql v1.6.0
github.com/golang/protobuf v1.5.2
github.com/gorse-io/dashboard v0.0.0-20210617050250-ed1a1e88296e
github.com/gorse-io/dashboard v0.0.0-20210617071223-310b8c2d7b7a
github.com/haxii/go-swagger-ui v3.19.4+incompatible
github.com/json-iterator/go v1.1.10
github.com/pkg/errors v0.9.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorse-io/dashboard v0.0.0-20210617050250-ed1a1e88296e h1:h46/luNlhNK9N6lotG75H6wSTNArE0ZHTwzOuA0ETQc=
github.com/gorse-io/dashboard v0.0.0-20210617050250-ed1a1e88296e/go.mod h1:14j0KVMmOAAN0xvsa2wxFoV/X8Aa/gCeNFRLjtYP6+M=
github.com/gorse-io/dashboard v0.0.0-20210617071223-310b8c2d7b7a h1:JDM6ndCEU7lpCUAKtMV2XK9rgmoc+9V7ejIOrJ4XpnM=
github.com/gorse-io/dashboard v0.0.0-20210617071223-310b8c2d7b7a/go.mod h1:14j0KVMmOAAN0xvsa2wxFoV/X8Aa/gCeNFRLjtYP6+M=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
Expand Down
2 changes: 1 addition & 1 deletion master/recommend.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (m *Master) analyze() error {
if !existed.Has(date.String()) {
// click through clickThroughRate
startTime := time.Now()
clickThroughRate, err := m.DataClient.GetClickThroughRate(date, "like", "read")
clickThroughRate, err := m.DataClient.GetClickThroughRate(date, m.GorseConfig.Database.ClickFeedbackTypes, m.GorseConfig.Database.ReadFeedbackType)
if err != nil {
return err
}
Expand Down
40 changes: 0 additions & 40 deletions misc/config_test/config.toml

This file was deleted.

2 changes: 1 addition & 1 deletion storage/data/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type Database interface {
GetFeedback(cursor string, n int, timeLimit *time.Time, feedbackTypes ...string) (string, []Feedback, error)
InsertMeasurement(measurement Measurement) error
GetMeasurements(name string, n int) ([]Measurement, error)
GetClickThroughRate(date time.Time, positiveType, readType string) (float64, error)
GetClickThroughRate(date time.Time, positiveTypes []string, readType string) (float64, error)
CountActiveUsers(date time.Time) (int, error)
}

Expand Down
5 changes: 4 additions & 1 deletion storage/data/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,18 +436,21 @@ func testGetClickThroughRate(t *testing.T, db Database) {
// insert feedback
err := db.BatchInsertFeedback([]Feedback{
{FeedbackKey: FeedbackKey{"star", "1", "1"}, Timestamp: time.Date(2000, 10, 1, 0, 0, 0, 0, time.UTC)},
{FeedbackKey: FeedbackKey{"like", "1", "1"}, Timestamp: time.Date(2000, 10, 1, 0, 0, 0, 0, time.UTC)},
{FeedbackKey: FeedbackKey{"read", "1", "1"}, Timestamp: time.Date(2000, 10, 1, 0, 0, 0, 0, time.UTC)},
{FeedbackKey: FeedbackKey{"read", "1", "2"}, Timestamp: time.Date(2000, 10, 1, 0, 0, 0, 0, time.UTC)},
{FeedbackKey: FeedbackKey{"read", "1", "3"}, Timestamp: time.Date(2000, 10, 1, 0, 0, 0, 0, time.UTC)},
{FeedbackKey: FeedbackKey{"read", "1", "4"}, Timestamp: time.Date(2000, 10, 1, 0, 0, 0, 0, time.UTC)},
{FeedbackKey: FeedbackKey{"star", "2", "1"}, Timestamp: time.Date(2000, 10, 1, 0, 0, 0, 0, time.UTC)},
{FeedbackKey: FeedbackKey{"star", "2", "3"}, Timestamp: time.Date(2001, 10, 1, 0, 0, 0, 0, time.UTC)},
{FeedbackKey: FeedbackKey{"read", "2", "1"}, Timestamp: time.Date(2000, 10, 1, 0, 0, 0, 0, time.UTC)},
{FeedbackKey: FeedbackKey{"read", "2", "2"}, Timestamp: time.Date(2000, 10, 1, 0, 0, 0, 0, time.UTC)},
{FeedbackKey: FeedbackKey{"read", "3", "2"}, Timestamp: time.Date(2000, 10, 1, 0, 0, 0, 0, time.UTC)},
{FeedbackKey: FeedbackKey{"star", "3", "3"}, Timestamp: time.Date(2001, 10, 1, 0, 0, 0, 0, time.UTC)},
}, true, true)
assert.Nil(t, err)
// get click-through-rate
rate, err := db.GetClickThroughRate(time.Date(2000, 10, 1, 0, 0, 0, 0, time.UTC), "star", "read")
rate, err := db.GetClickThroughRate(time.Date(2000, 10, 1, 0, 0, 0, 0, time.UTC), []string{"star", "like"}, "read")
assert.Nil(t, err)
assert.Equal(t, 0.375, rate)
}
Expand Down
71 changes: 52 additions & 19 deletions storage/data/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,53 +595,86 @@ func (db *MongoDB) CountActiveUsers(date time.Time) (int, error) {
if err != nil {
return 0, err
}
// db.feedback.distinct("feedbackkey.itemid",{"timestamp":{"$gte":new Date("2020-01-01"),"$lt":new Date("2020-01-02")}}).length
return len(distinct), nil
}

// GetClickThroughRate computes the click-through-rate of a specified date.
func (db *MongoDB) GetClickThroughRate(date time.Time, positiveType, readType string) (float64, error) {
func (db *MongoDB) GetClickThroughRate(date time.Time, positiveTypes []string, readType string) (float64, error) {
ctx := context.Background()
c := db.client.Database(db.dbName).Collection("feedback")
aggregate, err := c.Aggregate(ctx, mongo.Pipeline{
// count read feedbacks
readCountAgg, err := c.Aggregate(ctx, mongo.Pipeline{
{{"$match", bson.M{
"timestamp": bson.M{
"$gte": time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC),
"$lt": time.Date(date.Year(), date.Month(), date.Day()+1, 0, 0, 0, 0, time.UTC),
},
"feedbackkey.feedbacktype": bson.M{
"$in": append([]string{readType}, positiveTypes...),
},
}}},
{{"$project", bson.M{
"feedbackkey": 1,
"is_positive": bson.M{"$cond": bson.A{bson.M{"$eq": bson.A{"$feedbackkey.feedbacktype", positiveType}}, 1, 0}},
"is_read": bson.M{"$cond": bson.A{bson.M{"$eq": bson.A{"$feedbackkey.feedbacktype", readType}}, 1, 0}},
"feedbackkey.userid": 1,
"feedbackkey.itemid": 1,
}}},
{{"$group", bson.M{
"_id": "$feedbackkey.userid",
"positive_count": bson.M{"$sum": "$is_positive"},
"read_count": bson.M{"$sum": "$is_read"},
"_id": "$feedbackkey",
}}},
{{"$group", bson.M{
"_id": "$_id.userid",
"read_count": bson.M{"$sum": 1},
}}},
})
if err != nil {
return 0, err
}
readCount := make(map[string]int32)
for readCountAgg.Next(ctx) {
var ret bson.D
err = readCountAgg.Decode(&ret)
if err != nil {
return 0, err
}
readCount[ret.Map()["_id"].(string)] = ret.Map()["read_count"].(int32)
}
// count positive feedbacks
feedbackCountAgg, err := c.Aggregate(ctx, mongo.Pipeline{
{{"$match", bson.M{
"positive_count": bson.M{"$gt": 0}},
}},
"timestamp": bson.M{
"$gte": time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC),
"$lt": time.Date(date.Year(), date.Month(), date.Day()+1, 0, 0, 0, 0, time.UTC),
},
"feedbackkey.feedbacktype": bson.M{
"$in": positiveTypes,
},
}}},
{{"$project", bson.M{
"_id": 1,
"click_through_rate": bson.M{"$divide": bson.A{"$positive_count", "$read_count"}},
"feedbackkey.userid": 1,
"feedbackkey.itemid": 1,
}}},
{{"$group", bson.M{
"_id": nil,
"click_through_rate": bson.M{"$avg": "$click_through_rate"},
"_id": "$feedbackkey",
}}},
{{"$group", bson.M{
"_id": "$_id.userid",
"positive_count": bson.M{"$sum": 1},
}}},
})
if err != nil {
return 0, err
}
if aggregate.Next(ctx) {
sum, count := 0.0, 0.0
for feedbackCountAgg.Next(ctx) {
var ret bson.D
err = aggregate.Decode(&ret)
err = feedbackCountAgg.Decode(&ret)
if err != nil {
return 0, err
}
return ret.Map()["click_through_rate"].(float64), nil
count++
sum += float64(ret.Map()["positive_count"].(int32)) / float64(readCount[ret.Map()["_id"].(string)])
}
if count > 0 {
sum /= count
}
return 0, nil
return sum, err
}
2 changes: 1 addition & 1 deletion storage/data/no_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (NoDatabase) GetMeasurements(name string, n int) ([]Measurement, error) {
}

// GetClickThroughRate method of NoDatabase returns ErrNoDatabase.
func (NoDatabase) GetClickThroughRate(date time.Time, positiveType, readType string) (float64, error) {
func (NoDatabase) GetClickThroughRate(date time.Time, positiveTypes []string, readType string) (float64, error) {
return 0, ErrNoDatabase
}

Expand Down
2 changes: 1 addition & 1 deletion storage/data/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func (r *Redis) DeleteUserItemFeedback(userId, itemId string, feedbackTypes ...s
}

// GetClickThroughRate method of Redis returns ErrUnsupported.
func (r *Redis) GetClickThroughRate(date time.Time, positiveType, readType string) (float64, error) {
func (r *Redis) GetClickThroughRate(date time.Time, positiveTypes []string, readType string) (float64, error) {
return 0, ErrUnsupported
}

Expand Down
32 changes: 27 additions & 5 deletions storage/data/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,11 +654,33 @@ func (d *SQLDatabase) DeleteUserItemFeedback(userId, itemId string, feedbackType
}

// GetClickThroughRate computes the click-through-rate of a specified date.
func (d *SQLDatabase) GetClickThroughRate(date time.Time, positiveType, readType string) (float64, error) {
rs, err := d.db.Query("SELECT SUM(feedback_type IN (?)) / SUM(feedback_type IN (?)) "+
"FROM feedback where DATE(time_stamp) = DATE(?) "+
"GROUP BY user_id HAVING SUM(feedback_type IN (?)) > 0 AND SUM(feedback_type IN (?)) > 0",
positiveType, readType, date, positiveType, readType)
func (d *SQLDatabase) GetClickThroughRate(date time.Time, positiveTypes []string, readType string) (float64, error) {
builder := strings.Builder{}
var args []interface{}
builder.WriteString("SELECT positive_count.positive_count / read_count.read_count FROM (")
builder.WriteString("SELECT user_id, COUNT(*) AS positive_count FROM (")
builder.WriteString("SELECT DISTINCT user_id, item_id FROM feedback WHERE DATE(time_stamp) = DATE(?) AND feedback_type IN (")
args = append(args, date)
for i, positiveType := range positiveTypes {
if i > 0 {
builder.WriteString(",")
}
builder.WriteString("?")
args = append(args, positiveType)
}
builder.WriteString(")) AS positive_feedback GROUP BY user_id) AS positive_count ")
builder.WriteString("JOIN (")
builder.WriteString("SELECT user_id, COUNT(*) AS read_count FROM (")
builder.WriteString("SELECT DISTINCT user_id, item_id FROM feedback WHERE DATE(time_stamp) = DATE(?) AND feedback_type IN (?")
args = append(args, date, readType)
for _, positiveType := range positiveTypes {
builder.WriteString(",?")
args = append(args, positiveType)
}
builder.WriteString(")) AS read_feedback GROUP BY user_id) AS read_count ")
builder.WriteString("ON positive_count.user_id = read_count.user_id")
base.Logger().Info("get click through rate from MySQL", zap.String("query", builder.String()))
rs, err := d.db.Query(builder.String(), args...)
if err != nil {
return 0, err
}
Expand Down

0 comments on commit e4506a1

Please sign in to comment.