Skip to content

Commit

Permalink
set scroll time
Browse files Browse the repository at this point in the history
  • Loading branch information
andy committed Aug 1, 2024
1 parent 742f751 commit 064617e
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 29 deletions.
1 change: 1 addition & 0 deletions config/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type TaskCfg struct {
TaskAction TaskAction `mapstructure:"action"`
Force bool `mapstructure:"force"`
ScrollSize uint `mapstructure:"scroll_size"`
ScrollTime uint `mapstructure:"scroll_time"`
Parallelism uint `mapstructure:"parallelism"`
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/es/es0.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type ES interface {
GetClusterVersion() string
IndexExisted(index string) (bool, error)
GetIndexes() ([]string, error)
SearchByScroll(ctx context.Context, index string, query map[string]interface{}, sort string, size int,
yield func(*ScrollResultYield)) error
SearchByScroll(ctx context.Context, index string, query map[string]interface{},
sort string, scrollSize uint, scrollTime uint, yield func(*ScrollResultYield)) error

BulkInsert(index string, hitDocs []Doc) error
BulkUpdate(index string, hitDocs []Doc) error
Expand Down
9 changes: 5 additions & 4 deletions pkg/es/es5.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/CharellKing/ela/config"
"github.com/CharellKing/ela/utils"
"github.com/mitchellh/mapstructure"
"github.com/spf13/cast"
"io"
"log"
"strings"
Expand Down Expand Up @@ -68,12 +69,12 @@ type ScrollResultV5 struct {
} `json:"_shards,omitempty"`
}

func (es *V5) SearchByScroll(ctx context.Context, index string, query map[string]interface{}, sort string, size int,
yield func(*ScrollResultYield)) error {
func (es *V5) SearchByScroll(ctx context.Context, index string, query map[string]interface{},
sort string, scrollSize uint, scrollTime uint, yield func(*ScrollResultYield)) error {
scrollSearchOptions := []func(*esapi.SearchRequest){
es.Search.WithIndex(index),
es.Search.WithSize(size),
es.Search.WithScroll(time.Minute),
es.Search.WithSize(cast.ToInt(scrollSize)),
es.Search.WithScroll(cast.ToDuration(scrollTime) * time.Minute),
}

if len(query) > 0 {
Expand Down
9 changes: 5 additions & 4 deletions pkg/es/es6.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/CharellKing/ela/config"
"github.com/CharellKing/ela/utils"
"github.com/mitchellh/mapstructure"
"github.com/spf13/cast"
"io"
"log"
"strings"
Expand Down Expand Up @@ -45,12 +46,12 @@ func (es *V6) GetClusterVersion() string {
return es.ClusterVersion
}

func (es *V6) SearchByScroll(ctx context.Context, index string, query map[string]interface{}, sort string, size int,
yield func(*ScrollResultYield)) error {
func (es *V6) SearchByScroll(ctx context.Context, index string, query map[string]interface{},
sort string, scrollSize uint, scrollTime uint, yield func(*ScrollResultYield)) error {
scrollSearchOptions := []func(*esapi.SearchRequest){
es.Search.WithIndex(index),
es.Search.WithSize(size),
es.Search.WithScroll(time.Minute),
es.Search.WithSize(cast.ToInt(scrollSize)),
es.Search.WithScroll(cast.ToDuration(scrollTime) * time.Minute),
}

if len(query) > 0 {
Expand Down
9 changes: 5 additions & 4 deletions pkg/es/es7.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/CharellKing/ela/config"
"github.com/CharellKing/ela/utils"
"github.com/mitchellh/mapstructure"
"github.com/spf13/cast"
"io"
"log"
"strings"
Expand Down Expand Up @@ -70,12 +71,12 @@ type ScrollResultV7 struct {
} `json:"_shards,omitempty"`
}

func (es *V7) SearchByScroll(ctx context.Context, index string, query map[string]interface{}, sort string, size int,
yield func(*ScrollResultYield)) error {
func (es *V7) SearchByScroll(ctx context.Context, index string, query map[string]interface{},
sort string, scrollSize uint, scrollTime uint, yield func(*ScrollResultYield)) error {
scrollSearchOptions := []func(*esapi.SearchRequest){
es.Search.WithIndex(index),
es.Search.WithSize(size),
es.Search.WithScroll(time.Minute),
es.Search.WithSize(cast.ToInt(scrollSize)),
es.Search.WithScroll(cast.ToDuration(scrollTime) * time.Minute),
}

if len(query) > 0 {
Expand Down
9 changes: 5 additions & 4 deletions pkg/es/es8.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/CharellKing/ela/config"
"github.com/CharellKing/ela/utils"
"github.com/mitchellh/mapstructure"
"github.com/spf13/cast"
"io"
"log"
"strings"
Expand Down Expand Up @@ -71,12 +72,12 @@ type ScrollResultV8 struct {
} `json:"_shards,omitempty"`
}

func (es *V8) SearchByScroll(ctx context.Context, index string, query map[string]interface{}, sort string, size int,
yield func(*ScrollResultYield)) error {
func (es *V8) SearchByScroll(ctx context.Context, index string, query map[string]interface{},
sort string, scrollSize uint, scrollTime uint, yield func(*ScrollResultYield)) error {
scrollSearchOptions := []func(*esapi.SearchRequest){
es.Search.WithIndex(index),
es.Search.WithSize(size),
es.Search.WithScroll(time.Minute),
es.Search.WithSize(cast.ToInt(scrollSize)),
es.Search.WithScroll(cast.ToDuration(scrollTime) * time.Minute),
}

if len(query) > 0 {
Expand Down
1 change: 1 addition & 0 deletions service/bulkmigrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

const defaultScrollSize = 1000
const defaultParallelism = 12
const defaultScrollTime = 10

type BulkMigrator struct {
ctx context.Context
Expand Down
34 changes: 23 additions & 11 deletions service/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Migrator struct {
IndexPair config.IndexPair

ScrollSize uint
ScrollTime uint
}

func NewMigrator(ctx context.Context, srcConfig *config.ESConfig, dstConfig *config.ESConfig) (*Migrator, error) {
Expand All @@ -44,6 +45,7 @@ func NewMigrator(ctx context.Context, srcConfig *config.ESConfig, dstConfig *con
SourceES: srcES,
TargetES: dstES,
ScrollSize: defaultScrollSize,
ScrollTime: defaultScrollTime,
}, nil
}

Expand Down Expand Up @@ -73,6 +75,16 @@ func (m *Migrator) WithScrollSize(scrollSize uint) *Migrator {
}
}

func (m *Migrator) WithScrollTime(scrollTime uint) *Migrator {
return &Migrator{
SourceES: m.SourceES,
TargetES: m.TargetES,
IndexPair: m.IndexPair,
ScrollSize: m.ScrollSize,
ScrollTime: scrollTime,
}
}

func (m *Migrator) CopyIndexSettings(force bool) error {
existed, err := m.TargetES.IndexExisted(m.IndexPair.TargetIndex)
if err != nil {
Expand Down Expand Up @@ -124,7 +136,7 @@ func (m *Migrator) SyncDiff() ([3][]utils.HashDiff, error) {
},
}

if err := m.syncInsert(queryMap, cast.ToInt(m.ScrollSize)); err != nil {
if err := m.syncInsert(queryMap); err != nil {
return diffs, errors.WithStack(err)
}
}
Expand All @@ -148,7 +160,7 @@ func (m *Migrator) SyncDiff() ([3][]utils.HashDiff, error) {
},
}

if err := m.syncUpdate(queryMap, cast.ToInt(m.ScrollSize)); err != nil {
if err := m.syncUpdate(queryMap); err != nil {
return diffs, errors.WithStack(err)
}
}
Expand All @@ -161,7 +173,7 @@ func (m *Migrator) Compare() ([3][]utils.HashDiff, error) {

sourceCh := lo.Async(func() error {
var err error
sourceDocHashMap, err = m.getDocsHashValues(m.SourceES, m.IndexPair.SourceIndex, cast.ToInt(m.ScrollSize))
sourceDocHashMap, err = m.getDocsHashValues(m.SourceES, m.IndexPair.SourceIndex)
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -170,7 +182,7 @@ func (m *Migrator) Compare() ([3][]utils.HashDiff, error) {

targetCh := lo.Async(func() error {
var err error
targetDocHashMap, err = m.getDocsHashValues(m.TargetES, m.IndexPair.TargetIndex, cast.ToInt(m.ScrollSize))
targetDocHashMap, err = m.getDocsHashValues(m.TargetES, m.IndexPair.TargetIndex)
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -186,12 +198,12 @@ func (m *Migrator) Sync(force bool) error {
if err := m.CopyIndexSettings(force); err != nil {
return errors.WithStack(err)
}
return m.syncInsert(nil, cast.ToInt(m.ScrollSize))
return m.syncInsert(nil)
}

func (m *Migrator) syncInsert(query map[string]interface{}, size int) error {
func (m *Migrator) syncInsert(query map[string]interface{}) error {
for v := range lo.Generator(1, func(yield func(*es2.ScrollResultYield)) {
if err := m.SourceES.SearchByScroll(m.GetCtx(), m.IndexPair.SourceIndex, query, "", size, yield); err != nil {
if err := m.SourceES.SearchByScroll(m.GetCtx(), m.IndexPair.SourceIndex, query, "", m.ScrollSize, m.ScrollTime, yield); err != nil {
utils.GetLogger(m.ctx).WithError(err).Error("search scroll")
}
}) {
Expand All @@ -204,9 +216,9 @@ func (m *Migrator) syncInsert(query map[string]interface{}, size int) error {
return nil
}

func (m *Migrator) syncUpdate(query map[string]interface{}, size int) error {
func (m *Migrator) syncUpdate(query map[string]interface{}) error {
for v := range lo.Generator(1, func(yield func(*es2.ScrollResultYield)) {
if err := m.SourceES.SearchByScroll(m.GetCtx(), m.IndexPair.SourceIndex, query, "", size, yield); err != nil {
if err := m.SourceES.SearchByScroll(m.GetCtx(), m.IndexPair.SourceIndex, query, "", m.ScrollSize, m.ScrollTime, yield); err != nil {
utils.GetLogger(m.GetCtx()).WithError(err).Error("search by scroll")
}
}) {
Expand All @@ -226,10 +238,10 @@ func (m *Migrator) syncDelete(hitDocs []es2.Doc) error {
return nil
}

func (m *Migrator) getDocsHashValues(esInstance es2.ES, index string, size int) (map[string]*utils.DocHash, error) {
func (m *Migrator) getDocsHashValues(esInstance es2.ES, index string) (map[string]*utils.DocHash, error) {
docHashMap := make(map[string]*utils.DocHash)
for v := range lo.Generator(1, func(yield func(*es2.ScrollResultYield)) {
if err := esInstance.SearchByScroll(m.GetCtx(), index, nil, "", size, yield); err != nil {
if err := esInstance.SearchByScroll(m.GetCtx(), index, nil, "", m.ScrollSize, m.ScrollTime, yield); err != nil {
utils.GetLogger(m.ctx).WithError(err).Error("search by scroll")
}
}) {
Expand Down

0 comments on commit 064617e

Please sign in to comment.