Skip to content

Commit

Permalink
test es5->es7 es7->es8
Browse files Browse the repository at this point in the history
  • Loading branch information
andy committed Jul 31, 2024
1 parent 9b96c77 commit 742f751
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 8 deletions.
5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ func main() {
utils.GetLogger(ctx).WithError(err).Error("create task manager")
return
}
taskMgr.Run(ctx)
if err := taskMgr.Run(ctx); err != nil {
utils.GetLogger(ctx).WithError(err).Error("run task manager")
return
}

return
}
2 changes: 1 addition & 1 deletion pkg/es/es5settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (v5 *V5Settings) ToESV6Mapping(targetIndex string) map[string]interface{} {
}

func (v5 *V5Settings) ToESV7Mapping(targetIndex string) map[string]interface{} {
return v5.ToESV5Mapping(targetIndex)
return v5.ToESV8Mapping()
}

func (v5 *V5Settings) ToESV8Mapping() map[string]interface{} {
Expand Down
34 changes: 29 additions & 5 deletions pkg/es/es7.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,31 @@ func (es *V7) GetClusterVersion() string {
return es.ClusterVersion
}

type ScrollResultV7 struct {
Took int `json:"took,omitempty"`
ScrollId string `json:"_scroll_id,omitempty"`
TimedOut bool `json:"timed_out,omitempty"`
Hits struct {
MaxScore float32 `json:"max_score,omitempty"`
Total struct {
Value int `json:"value,omitempty"`
Relation string `json:"relation,omitempty"`
} `json:"total,omitempty"`
Docs []interface{} `json:"hits,omitempty"`
} `json:"hits"`
Shards struct {
Successful int `json:"successful,omitempty"`
Skipped int `json:"skipped,omitempty"`
Failed int `json:"failed,omitempty"`
Failures []struct {
Shard int `json:"shard,omitempty"`
Index string `json:"index,omitempty"`
Status int `json:"status,omitempty"`
Reason interface{} `json:"reason,omitempty"`
} `json:"failures,omitempty"`
} `json:"_shards,omitempty"`
}

func (es *V7) SearchByScroll(ctx context.Context, index string, query map[string]interface{}, sort string, size int,
yield func(*ScrollResultYield)) error {
scrollSearchOptions := []func(*esapi.SearchRequest){
Expand Down Expand Up @@ -76,7 +101,7 @@ func (es *V7) SearchByScroll(ctx context.Context, index string, query map[string
return errors.New(res.String())
}

var scrollResult ScrollResultV5
var scrollResult ScrollResultV7
if err := json.NewDecoder(res.Body).Decode(&scrollResult); err != nil {
return errors.WithStack(err)
}
Expand All @@ -97,7 +122,7 @@ func (es *V7) SearchByScroll(ctx context.Context, index string, query map[string
}

yield(&ScrollResultYield{
Total: uint64(scrollResult.Hits.Total),
Total: uint64(scrollResult.Hits.Total.Value),
Docs: hitDocs,
})

Expand All @@ -117,7 +142,7 @@ func (es *V7) SearchByScroll(ctx context.Context, index string, query map[string
return errors.New(res.String())
}

var scrollResult ScrollResultV5
var scrollResult ScrollResultV7
if err := json.NewDecoder(res.Body).Decode(&scrollResult); err != nil {
return errors.WithStack(err)
}
Expand All @@ -135,7 +160,7 @@ func (es *V7) SearchByScroll(ctx context.Context, index string, query map[string
}

yield(&ScrollResultYield{
Total: uint64(scrollResult.Hits.Total),
Total: uint64(scrollResult.Hits.Total.Value),
Docs: hitDocs,
})
return nil
Expand Down Expand Up @@ -218,7 +243,6 @@ func (es *V7) BulkInsert(index string, hitDocs []Doc) error {
"index": map[string]interface{}{
"_index": index,
"_id": doc.ID,
"_type": doc.Type,
},
}
metaBytes, _ := json.Marshal(meta)
Expand Down
1 change: 0 additions & 1 deletion pkg/es/es8.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ func (es *V8) BulkInsert(index string, hitDocs []Doc) error {
"index": map[string]interface{}{
"_index": index,
"_id": doc.ID,
"_type": doc.Type,
},
}
metaBytes, _ := json.Marshal(meta)
Expand Down

0 comments on commit 742f751

Please sign in to comment.