Skip to content

Commit

Permalink
optimize data import and export (#272)
Browse files Browse the repository at this point in the history
  • Loading branch information
Somiacao authored Sep 25, 2021
1 parent 06f604d commit c81d777
Showing 1 changed file with 66 additions and 54 deletions.
120 changes: 66 additions & 54 deletions master/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package master
import (
"bufio"
"fmt"
"github.com/juju/errors"
"github.com/zhenghaoz/gorse/model/click"
"github.com/zhenghaoz/gorse/model/ranking"
"io"
Expand Down Expand Up @@ -469,25 +470,19 @@ func (m *Master) importExportUsers(response http.ResponseWriter, request *http.R
return
}
// write rows
var cursor string
const batchSize = 1024
for {
var users []data.User
cursor, users, err = m.DataClient.GetUsers(cursor, batchSize)
if err != nil {
server.InternalServerError(restful.NewResponse(response), err)
return
}
userChan, errChan := m.DataClient.GetUserStream(batchSize)
for users := range userChan {
for _, user := range users {
if _, err = response.Write([]byte(fmt.Sprintf("%s,%s\r\n",
base.Escape(user.UserId), base.Escape(strings.Join(user.Labels, "|"))))); err != nil {
server.InternalServerError(restful.NewResponse(response), err)
return
}
}
if cursor == "" {
break
}
}
if err = <-errChan; err != nil {
server.InternalServerError(restful.NewResponse(response), errors.Trace(err))
return
}
case http.MethodPost:
hasHeader := formValue(request, "has-header", "true") == "true"
Expand All @@ -512,7 +507,7 @@ func (m *Master) importExportUsers(response http.ResponseWriter, request *http.R
func (m *Master) importUsers(response http.ResponseWriter, file io.Reader, hasHeader bool, sep, labelSep, fmtString string) {
lineCount := 0
timeStart := time.Now()
//users := make([]data.User, 0)
users := make([]data.User, 0)
err := base.ReadLines(bufio.NewScanner(file), sep, func(lineNumber int, splits []string) bool {
var err error
// skip header
Expand Down Expand Up @@ -543,10 +538,15 @@ func (m *Master) importUsers(response http.ResponseWriter, file io.Reader, hasHe
}
}
}
err = m.DataClient.BatchInsertUsers([]data.User{user})
if err != nil {
server.InternalServerError(restful.NewResponse(response), err)
return false
users = append(users, user)
// batch insert
if len(users) == batchSize {
err = m.DataClient.BatchInsertUsers(users)
if err != nil {
server.InternalServerError(restful.NewResponse(response), err)
return false
}
users = nil
}
lineCount++
return true
Expand All @@ -555,6 +555,13 @@ func (m *Master) importUsers(response http.ResponseWriter, file io.Reader, hasHe
server.BadRequest(restful.NewResponse(response), err)
return
}
if len(users) > 0 {
err = m.DataClient.BatchInsertUsers(users)
if err != nil {
server.InternalServerError(restful.NewResponse(response), err)
return
}
}
timeUsed := time.Since(timeStart)
base.Logger().Info("complete import users",
zap.Duration("time_used", timeUsed),
Expand All @@ -574,25 +581,19 @@ func (m *Master) importExportItems(response http.ResponseWriter, request *http.R
return
}
// write rows
var cursor string
const batchSize = 1024
for {
var items []data.Item
cursor, items, err = m.DataClient.GetItems(cursor, batchSize, nil)
if err != nil {
server.InternalServerError(restful.NewResponse(response), err)
return
}
itemChan, errChan := m.DataClient.GetItemStream(batchSize, nil)
for items := range itemChan {
for _, item := range items {
if _, err = response.Write([]byte(fmt.Sprintf("%s,%v,%s,%s\r\n",
base.Escape(item.ItemId), item.Timestamp, base.Escape(strings.Join(item.Labels, "|")), base.Escape(item.Comment)))); err != nil {
server.InternalServerError(restful.NewResponse(response), err)
return
}
}
if cursor == "" {
break
}
}
if err = <-errChan; err != nil {
server.InternalServerError(restful.NewResponse(response), errors.Trace(err))
return
}
case http.MethodPost:
hasHeader := formValue(request, "has-header", "true") == "true"
Expand Down Expand Up @@ -660,22 +661,28 @@ func (m *Master) importItems(response http.ResponseWriter, file io.Reader, hasHe
// 4. comment
item.Comment = splits[3]
items = append(items, item)
//err = m.DataClient.insertItem(item)
//if err != nil {
// server.InternalServerError(restful.NewResponse(response), err)
// return false
//}
// batch insert
if len(items) == batchSize {
err = m.DataClient.BatchInsertItems(items)
if err != nil {
server.InternalServerError(restful.NewResponse(response), err)
return false
}
items = nil
}
lineCount++
return true
})
if err != nil {
server.BadRequest(restful.NewResponse(response), err)
return
}
err = m.DataClient.BatchInsertItems(items)
if err != nil {
server.InternalServerError(restful.NewResponse(response), err)
return
if len(items) > 0 {
err = m.DataClient.BatchInsertItems(items)
if err != nil {
server.InternalServerError(restful.NewResponse(response), err)
return
}
}
timeUsed := time.Since(timeStart)
base.Logger().Info("complete import items",
Expand Down Expand Up @@ -725,25 +732,19 @@ func (m *Master) importExportFeedback(response http.ResponseWriter, request *htt
return
}
// write rows
var cursor string
const batchSize = 1024
for {
var feedback []data.Feedback
cursor, feedback, err = m.DataClient.GetFeedback(cursor, batchSize, nil)
if err != nil {
server.InternalServerError(restful.NewResponse(response), err)
return
}
feedbackChan, errChan := m.DataClient.GetFeedbackStream(batchSize, nil)
for feedback := range feedbackChan {
for _, v := range feedback {
if _, err = response.Write([]byte(fmt.Sprintf("%s,%s,%s,%v\r\n",
base.Escape(v.FeedbackType), base.Escape(v.UserId), base.Escape(v.ItemId), v.Timestamp))); err != nil {
server.InternalServerError(restful.NewResponse(response), err)
return
}
}
if cursor == "" {
break
}
}
if err = <-errChan; err != nil {
server.InternalServerError(restful.NewResponse(response), errors.Trace(err))
return
}
case http.MethodPost:
hasHeader := formValue(request, "has-header", "true") == "true"
Expand Down Expand Up @@ -812,6 +813,15 @@ func (m *Master) importFeedback(response http.ResponseWriter, file io.Reader, ha
fmt.Errorf("failed to parse datetime `%v` at line %d", splits[3], lineCount))
}
feedbacks = append(feedbacks, feedback)
// batch insert
if len(feedbacks) == batchSize {
err = m.InsertFeedbackToCache(feedbacks)
if err != nil {
server.InternalServerError(restful.NewResponse(response), err)
return
}
feedbacks = nil
}
lineCount++
}
if err = scanner.Err(); err != nil {
Expand All @@ -827,10 +837,12 @@ func (m *Master) importFeedback(response http.ResponseWriter, file io.Reader, ha
return
}
// insert to cache store
err = m.InsertFeedbackToCache(feedbacks)
if err != nil {
server.InternalServerError(restful.NewResponse(response), err)
return
if len(feedbacks) > 0 {
err = m.InsertFeedbackToCache(feedbacks)
if err != nil {
server.InternalServerError(restful.NewResponse(response), err)
return
}
}
timeUsed := time.Since(timeStart)
base.Logger().Info("complete import feedback",
Expand Down

0 comments on commit c81d777

Please sign in to comment.