diff --git a/config/cfg.go b/config/cfg.go index 358053c..ce0758d 100644 --- a/config/cfg.go +++ b/config/cfg.go @@ -18,6 +18,7 @@ type TaskCfg struct { TaskAction TaskAction `mapstructure:"action"` Force bool `mapstructure:"force"` ScrollSize uint `mapstructure:"scroll_size"` + ScrollTime uint `mapstructure:"scroll_time"` Parallelism uint `mapstructure:"parallelism"` } diff --git a/pkg/es/es0.go b/pkg/es/es0.go index a11b37b..45304bd 100644 --- a/pkg/es/es0.go +++ b/pkg/es/es0.go @@ -27,8 +27,8 @@ type ES interface { GetClusterVersion() string IndexExisted(index string) (bool, error) GetIndexes() ([]string, error) - SearchByScroll(ctx context.Context, index string, query map[string]interface{}, sort string, size int, - yield func(*ScrollResultYield)) error + SearchByScroll(ctx context.Context, index string, query map[string]interface{}, + sort string, scrollSize uint, scrollTime uint, yield func(*ScrollResultYield)) error BulkInsert(index string, hitDocs []Doc) error BulkUpdate(index string, hitDocs []Doc) error diff --git a/pkg/es/es5.go b/pkg/es/es5.go index df31ea1..44cf47d 100644 --- a/pkg/es/es5.go +++ b/pkg/es/es5.go @@ -9,6 +9,7 @@ import ( "github.com/CharellKing/ela/config" "github.com/CharellKing/ela/utils" "github.com/mitchellh/mapstructure" + "github.com/spf13/cast" "io" "log" "strings" @@ -68,12 +69,12 @@ type ScrollResultV5 struct { } `json:"_shards,omitempty"` } -func (es *V5) SearchByScroll(ctx context.Context, index string, query map[string]interface{}, sort string, size int, - yield func(*ScrollResultYield)) error { +func (es *V5) SearchByScroll(ctx context.Context, index string, query map[string]interface{}, + sort string, scrollSize uint, scrollTime uint, yield func(*ScrollResultYield)) error { scrollSearchOptions := []func(*esapi.SearchRequest){ es.Search.WithIndex(index), - es.Search.WithSize(size), - es.Search.WithScroll(time.Minute), + es.Search.WithSize(cast.ToInt(scrollSize)), + es.Search.WithScroll(cast.ToDuration(scrollTime) * time.Minute), } if len(query) > 0 { diff --git a/pkg/es/es6.go b/pkg/es/es6.go index 39731d8..15b4edd 100644 --- a/pkg/es/es6.go +++ b/pkg/es/es6.go @@ -9,6 +9,7 @@ import ( "github.com/CharellKing/ela/config" "github.com/CharellKing/ela/utils" "github.com/mitchellh/mapstructure" + "github.com/spf13/cast" "io" "log" "strings" @@ -45,12 +46,12 @@ func (es *V6) GetClusterVersion() string { return es.ClusterVersion } -func (es *V6) SearchByScroll(ctx context.Context, index string, query map[string]interface{}, sort string, size int, - yield func(*ScrollResultYield)) error { +func (es *V6) SearchByScroll(ctx context.Context, index string, query map[string]interface{}, + sort string, scrollSize uint, scrollTime uint, yield func(*ScrollResultYield)) error { scrollSearchOptions := []func(*esapi.SearchRequest){ es.Search.WithIndex(index), - es.Search.WithSize(size), - es.Search.WithScroll(time.Minute), + es.Search.WithSize(cast.ToInt(scrollSize)), + es.Search.WithScroll(cast.ToDuration(scrollTime) * time.Minute), } if len(query) > 0 { diff --git a/pkg/es/es7.go b/pkg/es/es7.go index 42990a1..c39ac20 100644 --- a/pkg/es/es7.go +++ b/pkg/es/es7.go @@ -9,6 +9,7 @@ import ( "github.com/CharellKing/ela/config" "github.com/CharellKing/ela/utils" "github.com/mitchellh/mapstructure" + "github.com/spf13/cast" "io" "log" "strings" @@ -70,12 +71,12 @@ type ScrollResultV7 struct { } `json:"_shards,omitempty"` } -func (es *V7) SearchByScroll(ctx context.Context, index string, query map[string]interface{}, sort string, size int, - yield func(*ScrollResultYield)) error { +func (es *V7) SearchByScroll(ctx context.Context, index string, query map[string]interface{}, + sort string, scrollSize uint, scrollTime uint, yield func(*ScrollResultYield)) error { scrollSearchOptions := []func(*esapi.SearchRequest){ es.Search.WithIndex(index), - es.Search.WithSize(size), - es.Search.WithScroll(time.Minute), + es.Search.WithSize(cast.ToInt(scrollSize)), + es.Search.WithScroll(cast.ToDuration(scrollTime) * time.Minute), } if len(query) > 0 { diff --git a/pkg/es/es8.go b/pkg/es/es8.go index 3395c6b..3c5302b 100644 --- a/pkg/es/es8.go +++ b/pkg/es/es8.go @@ -9,6 +9,7 @@ import ( "github.com/CharellKing/ela/config" "github.com/CharellKing/ela/utils" "github.com/mitchellh/mapstructure" + "github.com/spf13/cast" "io" "log" "strings" @@ -71,12 +72,12 @@ type ScrollResultV8 struct { } `json:"_shards,omitempty"` } -func (es *V8) SearchByScroll(ctx context.Context, index string, query map[string]interface{}, sort string, size int, - yield func(*ScrollResultYield)) error { +func (es *V8) SearchByScroll(ctx context.Context, index string, query map[string]interface{}, + sort string, scrollSize uint, scrollTime uint, yield func(*ScrollResultYield)) error { scrollSearchOptions := []func(*esapi.SearchRequest){ es.Search.WithIndex(index), - es.Search.WithSize(size), - es.Search.WithScroll(time.Minute), + es.Search.WithSize(cast.ToInt(scrollSize)), + es.Search.WithScroll(cast.ToDuration(scrollTime) * time.Minute), } if len(query) > 0 { diff --git a/service/bulkmigrator.go b/service/bulkmigrator.go index c423a20..95611d3 100644 --- a/service/bulkmigrator.go +++ b/service/bulkmigrator.go @@ -16,6 +16,7 @@ import ( const defaultScrollSize = 1000 const defaultParallelism = 12 +const defaultScrollTime = 10 type BulkMigrator struct { ctx context.Context diff --git a/service/migrator.go b/service/migrator.go index b38d2a3..3eb6a9b 100644 --- a/service/migrator.go +++ b/service/migrator.go @@ -24,6 +24,7 @@ type Migrator struct { IndexPair config.IndexPair ScrollSize uint + ScrollTime uint } func NewMigrator(ctx context.Context, srcConfig *config.ESConfig, dstConfig *config.ESConfig) (*Migrator, error) { @@ -44,6 +45,7 @@ func NewMigrator(ctx context.Context, srcConfig *config.ESConfig, dstConfig *con SourceES: srcES, TargetES: dstES, ScrollSize: defaultScrollSize, + ScrollTime: defaultScrollTime, }, nil } @@ -73,6 +75,16 @@ func (m *Migrator) WithScrollSize(scrollSize uint) *Migrator { } } +func (m *Migrator) WithScrollTime(scrollTime uint) *Migrator { + return &Migrator{ + SourceES: m.SourceES, + TargetES: m.TargetES, + IndexPair: m.IndexPair, + ScrollSize: m.ScrollSize, + ScrollTime: scrollTime, + } +} + func (m *Migrator) CopyIndexSettings(force bool) error { existed, err := m.TargetES.IndexExisted(m.IndexPair.TargetIndex) if err != nil { @@ -124,7 +136,7 @@ func (m *Migrator) SyncDiff() ([3][]utils.HashDiff, error) { }, } - if err := m.syncInsert(queryMap, cast.ToInt(m.ScrollSize)); err != nil { + if err := m.syncInsert(queryMap); err != nil { return diffs, errors.WithStack(err) } } @@ -148,7 +160,7 @@ func (m *Migrator) SyncDiff() ([3][]utils.HashDiff, error) { }, } - if err := m.syncUpdate(queryMap, cast.ToInt(m.ScrollSize)); err != nil { + if err := m.syncUpdate(queryMap); err != nil { return diffs, errors.WithStack(err) } } @@ -161,7 +173,7 @@ func (m *Migrator) Compare() ([3][]utils.HashDiff, error) { sourceCh := lo.Async(func() error { var err error - sourceDocHashMap, err = m.getDocsHashValues(m.SourceES, m.IndexPair.SourceIndex, cast.ToInt(m.ScrollSize)) + sourceDocHashMap, err = m.getDocsHashValues(m.SourceES, m.IndexPair.SourceIndex) if err != nil { return errors.WithStack(err) } @@ -170,7 +182,7 @@ func (m *Migrator) Compare() ([3][]utils.HashDiff, error) { targetCh := lo.Async(func() error { var err error - targetDocHashMap, err = m.getDocsHashValues(m.TargetES, m.IndexPair.TargetIndex, cast.ToInt(m.ScrollSize)) + targetDocHashMap, err = m.getDocsHashValues(m.TargetES, m.IndexPair.TargetIndex) if err != nil { return errors.WithStack(err) } @@ -186,12 +198,12 @@ func (m *Migrator) Sync(force bool) error { if err := m.CopyIndexSettings(force); err != nil { return errors.WithStack(err) } - return m.syncInsert(nil, cast.ToInt(m.ScrollSize)) + return m.syncInsert(nil) } -func (m *Migrator) syncInsert(query map[string]interface{}, size int) error { +func (m *Migrator) syncInsert(query map[string]interface{}) error { for v := range lo.Generator(1, func(yield func(*es2.ScrollResultYield)) { - if err := m.SourceES.SearchByScroll(m.GetCtx(), m.IndexPair.SourceIndex, query, "", size, yield); err != nil { + if err := m.SourceES.SearchByScroll(m.GetCtx(), m.IndexPair.SourceIndex, query, "", m.ScrollSize, m.ScrollTime, yield); err != nil { utils.GetLogger(m.ctx).WithError(err).Error("search scroll") } }) { @@ -204,9 +216,9 @@ func (m *Migrator) syncInsert(query map[string]interface{}, size int) error { return nil } -func (m *Migrator) syncUpdate(query map[string]interface{}, size int) error { +func (m *Migrator) syncUpdate(query map[string]interface{}) error { for v := range lo.Generator(1, func(yield func(*es2.ScrollResultYield)) { - if err := m.SourceES.SearchByScroll(m.GetCtx(), m.IndexPair.SourceIndex, query, "", size, yield); err != nil { + if err := m.SourceES.SearchByScroll(m.GetCtx(), m.IndexPair.SourceIndex, query, "", m.ScrollSize, m.ScrollTime, yield); err != nil { utils.GetLogger(m.GetCtx()).WithError(err).Error("search by scroll") } }) { @@ -226,10 +238,10 @@ func (m *Migrator) syncDelete(hitDocs []es2.Doc) error { return nil } -func (m *Migrator) getDocsHashValues(esInstance es2.ES, index string, size int) (map[string]*utils.DocHash, error) { +func (m *Migrator) getDocsHashValues(esInstance es2.ES, index string) (map[string]*utils.DocHash, error) { docHashMap := make(map[string]*utils.DocHash) for v := range lo.Generator(1, func(yield func(*es2.ScrollResultYield)) { - if err := esInstance.SearchByScroll(m.GetCtx(), index, nil, "", size, yield); err != nil { + if err := esInstance.SearchByScroll(m.GetCtx(), index, nil, "", m.ScrollSize, m.ScrollTime, yield); err != nil { utils.GetLogger(m.ctx).WithError(err).Error("search by scroll") } }) {